"""
Including a handler derived from the original repository:
https://github.com/zach-taylor/splunk_handler
This version was built to fix issues seen
with multiple Celery worker processes.
Available environment variables:
::
export SPLUNK_HOST="<splunk host>"
export SPLUNK_PORT="<splunk port: 8088>"
export SPLUNK_API_PORT="<splunk port: 8089>"
export SPLUNK_ADDRESS="<splunk address host:port>"
export SPLUNK_API_ADDRESS="<splunk api address host:port>"
export SPLUNK_TOKEN="<splunk token>"
export SPLUNK_INDEX="<splunk index>"
export SPLUNK_SOURCE="<splunk source>"
export SPLUNK_SOURCETYPE="<splunk sourcetype>"
export SPLUNK_VERIFY="<verify certs on HTTP POST>"
export SPLUNK_TIMEOUT="<timeout in seconds>"
export SPLUNK_QUEUE_SIZE="<num msgs allowed in queue - 0=infinite>"
export SPLUNK_SLEEP_INTERVAL="<sleep in seconds per batch>"
export SPLUNK_RETRY_COUNT="<attempts per log to retry publishing>"
export SPLUNK_RETRY_BACKOFF="<cooldown in seconds per failed POST>"
export SPLUNK_DEBUG="<1 enable debug|0 off>"
"""
import sys
import atexit
import traceback
import multiprocessing
import threading
import json
import logging
import socket
import time
import requests
import spylunking.send_to_splunk as send_to_splunk
from threading import Timer
from spylunking.rnow import rnow
from spylunking.ppj import ppj
from spylunking.consts import SPLUNK_HOST
from spylunking.consts import SPLUNK_PORT
from spylunking.consts import SPLUNK_TOKEN
from spylunking.consts import SPLUNK_INDEX
from spylunking.consts import SPLUNK_SOURCE
from spylunking.consts import SPLUNK_SOURCETYPE
from spylunking.consts import SPLUNK_VERIFY
from spylunking.consts import SPLUNK_TIMEOUT
from spylunking.consts import SPLUNK_SLEEP_INTERVAL
from spylunking.consts import SPLUNK_RETRY_COUNT
from spylunking.consts import SPLUNK_RETRY_BACKOFF
from spylunking.consts import SPLUNK_QUEUE_SIZE
from spylunking.consts import SPLUNK_DEBUG
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
is_py2 = sys.version[0] == '2'
if is_py2:
from queue import Queue # noqa
else:
from queue import Queue # noqa
# For keeping track of running class instances
instances = []
# Called when application exit imminent (main thread ended / got kill signal)
@atexit.register
def perform_exit():
"""perform_exit
Handling at-the-exit events
---------------------------
This will cleanup each worker process which
could be in the middle of a request/sleep/block
action. This has been tested on python 3 with
Celery and single processes.
"""
if SPLUNK_DEBUG:
print(('{} -------------------------------'.format(
rnow())))
print(('{} splunkpub: atexit.register - start'.format(
rnow())))
worked = True
for instance in instances:
try:
if SPLUNK_DEBUG:
print(('{} - shutting down instance={} - start'.format(
rnow(),
instance)))
instance.shutdown()
if SPLUNK_DEBUG:
print(('{} - shutting down instance={} - done'.format(
rnow(),
instance)))
except Exception as e:
worked = False
if SPLUNK_DEBUG:
print((
'{} - shutting down instance={} '
'- hit ex={} during shutdown'.format(
rnow(),
instance,
e)))
# end of try/ex
if not worked:
if SPLUNK_DEBUG:
print(('{} Failed exiting'.format(
rnow())))
if SPLUNK_DEBUG:
print(('{} splunkpub: atexit.register - done'.format(
rnow())))
print(('{} -------------------------------'.format(
rnow())))
# end of perform_exit
def force_flush():
"""force_flush"""
if SPLUNK_DEBUG:
print(('{} -------------------------------'.format(
rnow())))
print(('{} splunkpub: force_flush - start'.format(
rnow())))
worked = True
for instance in instances:
try:
instance.force_flush()
except Exception as e:
worked = False
if SPLUNK_DEBUG:
print((
'{} - force_flush instance={} '
'- hit ex={}'.format(
rnow(),
instance,
e)))
# end of try/ex
if not worked:
if SPLUNK_DEBUG:
print(('{} Failed flushing queues'.format(
rnow())))
if SPLUNK_DEBUG:
print(('{} splunkpub: force_flush - done'.format(
rnow())))
print(('{} -------------------------------'.format(
rnow())))
# end of force_flush
[docs]class SplunkPublisher(logging.Handler):
"""
A logging handler to send logs to a Splunk Enterprise instance
running the Splunk HTTP Event Collector.
Originally inspired from the repository:
https://github.com/zach-taylor/splunk_handler
This class allows multiple processes like Celery workers
to reliably publish logs to Splunk from inside of a Celery task
"""
def __init__(
self,
host=None,
port=None,
address=None,
token=None,
index=None,
hostname=None,
source=None,
sourcetype='text',
verify=True,
timeout=60,
sleep_interval=2.0,
queue_size=0,
debug=False,
retry_count=20,
retry_backoff=2.0,
run_once=False):
"""__init__
Initialize the SplunkPublisher
:param host: Splunk fqdn
:param port: Splunk HEC Port 8088
:param address: Splunk fqdn:8088 - overrides host and port
:param token: Pre-existing Splunk token
:param index: Splunk index
:param hostname: Splunk address <host:port>
:param source: source for log records
:param sourcetype: json
:param verify: verify using certs
:param timeout: HTTP request timeout in seconds
:param sleep_interval: Flush the queue of logs interval in seconds
:param queue_size: Queue this number of logs before dropping
new logs with 0 is an infinite number of messages
:param debug: debug the publisher
:param retry_count: number of publish retries per log record
:param retry_backoff: cooldown timer in seconds
:param run_once: test flag for running this just one time
"""
global instances
instances.append(self)
logging.Handler.__init__(self)
self.host = host
if self.host is None:
self.host = SPLUNK_HOST
self.port = port
if self.port is None:
self.port = SPLUNK_PORT
if address:
address_split = address.split(':')
self.host = address_split[0]
self.port = int(address_split[1])
self.token = token
if self.token is None:
self.token = SPLUNK_TOKEN
self.index = index
if self.index is None:
self.index = SPLUNK_INDEX
self.source = source
if self.source is None:
self.source = SPLUNK_SOURCE
self.sourcetype = sourcetype
if self.sourcetype is None:
self.sourcetype = SPLUNK_SOURCETYPE
self.verify = verify
if self.verify is None:
self.verify = SPLUNK_VERIFY
self.timeout = timeout
if self.timeout is None:
self.timeout = SPLUNK_TIMEOUT
self.sleep_interval = sleep_interval
if self.sleep_interval is None:
self.sleep_interval = SPLUNK_SLEEP_INTERVAL
self.retry_count = retry_count
if self.retry_count is None:
self.retry_count = SPLUNK_RETRY_COUNT
self.retry_backoff = retry_backoff
if self.retry_backoff is None:
self.retry_backoff = SPLUNK_RETRY_BACKOFF
self.queue_size = queue_size
if self.queue_size is None:
self.queue_size = SPLUNK_QUEUE_SIZE
self.log_payload = ''
self.timer = None
self.tid = None
self.manager = multiprocessing.Manager()
self.queue = self.manager.Queue(maxsize=self.queue_size)
self.session = requests.Session()
self.shutdown_event = multiprocessing.Event()
self.shutdown_ack = multiprocessing.Event()
self.already_done = multiprocessing.Event()
self.testing = False
self.shutdown_now = False
self.run_once = run_once
self.debug_count = 0
self.debug = debug
if SPLUNK_DEBUG:
self.debug = True
self.debug_log('starting debug mode')
if hostname is None:
self.hostname = socket.gethostname()
else:
self.hostname = hostname
self.debug_log('preparing to override loggers')
# prevent infinite recursion by silencing requests and urllib3 loggers
logging.getLogger('requests').propagate = False
logging.getLogger('urllib3').propagate = False
# and do the same for ourselves
logging.getLogger(__name__).propagate = False
# disable all warnings from urllib3 package
if not self.verify:
requests.packages.urllib3.disable_warnings()
# Set up automatic retry with back-off
self.debug_log('preparing to create a Requests session')
retry = Retry(
total=self.retry_count,
backoff_factor=self.retry_backoff,
method_whitelist=False, # Retry for any HTTP verb
status_forcelist=[500, 502, 503, 504])
self.session.mount('https://', HTTPAdapter(max_retries=retry))
self.start_worker_thread(
sleep_interval=self.sleep_interval)
self.debug_log((
'READY init - sleep_interval={}').format(
self.sleep_interval))
# end of __init__
[docs] def emit(
self,
record):
"""emit
Emit handler for queue-ing message for
the helper thread to send to Splunk on the ``sleep_interval``
:param record: LogRecord to send to Splunk
https://docs.python.org/3/library/logging.html
"""
self.debug_log('emit')
try:
record = self.format_record(
record)
except Exception as e:
self.write_log(
'Exception in Splunk logging handler={}'.format(e))
self.write_log(
traceback.format_exc())
return
if self.sleep_interval > 0:
try:
self.debug_log('put in queue')
# Put log message into queue; worker thread will pick up
self.queue.put_nowait(
record)
except Exception as e:
self.write_log(
'log queue full - log data will be dropped.')
else:
# Flush log immediately; is blocking call
self.publish_to_splunk(
payload=record)
# end of emit
[docs] def start_worker_thread(
self,
sleep_interval=1.0):
"""start_worker_thread
Start the helper worker thread to publish queued messages
to Splunk
:param sleep_interval: sleep in seconds before reading from
the queue again
"""
# Start a worker thread responsible for sending logs
if self.sleep_interval > 0:
self.debug_log(
'starting worker thread')
self.timer = Timer(
sleep_interval,
self.perform_work)
self.timer.daemon = True # Auto-kill thread if main process exits
self.timer.start()
# end of start_worker_thread
[docs] def write_log(
self,
log_message):
"""write_log
Write logs to stdout
:param log_message: message to log
"""
print(('{} splunkpub {}'.format(
rnow(),
log_message)))
# end of write_log
[docs] def debug_log(
self,
log_message):
"""debug_log
Write logs that only show up in debug mode.
To turn on debugging with environment variables
please set this environment variable:
::
export SPLUNK_DEBUG="1"
:param log_message: message to log
"""
if self.debug:
print(('{} splunkpub DEBUG {}'.format(
rnow(),
log_message)))
# end of debug_log
# end of format_record
[docs] def build_payload_from_queued_messages(
self,
use_queue,
shutdown_event,
triggered_by_shutdown=False):
"""build_payload_from_queued_messages
Empty the queued messages by building a large ``self.log_payload``
:param use_queue: queue holding the messages
:param shutdown_event: shutdown event
:param triggered_by_shutdown: called during shutdown
"""
self.debug_log('build_payload - start')
not_done = True
while not_done:
if not triggered_by_shutdown and self.is_shutting_down(
shutdown_event=shutdown_event):
self.debug_log(
'build_payload shutting down')
return True
self.debug_count += 1
if self.debug_count > 60:
self.debug_count = 0
self.debug_log('build_payload tid={} queue={}'.format(
self.tid,
str(use_queue)))
try:
msg = use_queue.get(
block=True,
timeout=self.sleep_interval)
self.log_payload = self.log_payload + msg
if self.debug:
self.debug_log('{} got={}'.format(
self,
ppj(msg)))
not_done = not self.queue_empty(
use_queue=use_queue)
except Exception as e:
if self.is_shutting_down(
shutdown_event=shutdown_event):
self.debug_log(
'helper was shut down '
'msgs in the queue may not all '
'have been sent')
if ('No such file or directory' in str(e)
or 'Broken pipe' in str(e)):
raise e
elif ("object, typeid 'Queue' at" in str(e)
and "'__str__()' failed" in str(e)):
raise e
not_done = True
# end of getting log msgs from the queue
if not triggered_by_shutdown and self.is_shutting_down(
shutdown_event=shutdown_event):
self.debug_log(
'build_payload - already shutting down')
return True
# If the payload is getting very long,
# stop reading and send immediately.
# Current limit is 50MB
if (not triggered_by_shutdown and self.is_shutting_down(
shutdown_event=shutdown_event)
or len(self.log_payload) >= 524288):
self.debug_log(
'payload maximum size exceeded, sending immediately')
return False
self.debug_log('build_payload - done')
return True
# end of build_payload_from_queued_messages
# end of perform_work
[docs] def publish_to_splunk(
self,
payload=None):
"""publish_to_splunk
Build the ``self.log_payload`` from the queued log messages
and POST it to the Splunk endpoint
:param payload: string message to send to Splunk
"""
self.debug_log('publish_to_splunk - start')
use_payload = payload
if not use_payload:
use_payload = self.log_payload
self.num_sent = 0
if use_payload:
url = 'https://{}:{}/services/collector'.format(
self.host,
self.port)
self.debug_log('splunk url={}'.format(
url))
try:
if self.debug:
try:
msg_dict = json.loads(use_payload)
event_data = json.loads(msg_dict['event'])
msg_dict['event'] = event_data
self.debug_log((
'sending payload: {}').format(
ppj(msg_dict)))
except Exception:
self.debug_log((
'sending data payload: {}').format(
use_payload))
if self.num_sent > 100000:
self.num_sent = 1
else:
self.num_sent += 1
send_to_splunk.send_to_splunk(
session=self.session,
url=url,
data=use_payload,
headers={'Authorization': 'Splunk {}'.format(
self.token)},
verify=self.verify,
timeout=self.timeout)
self.debug_log('payload sent success')
except Exception as e:
try:
self.write_log(
'Exception in Splunk logging handler: {}'.format(
e))
self.write_log(traceback.format_exc())
except Exception:
self.debug_log(
'Exception encountered,'
'but traceback could not be formatted')
self.log_payload = ''
else:
self.debug_log(
'no logs to send')
self.debug_log((
'publish_to_splunk - done - '
'self.is_shutting_down={} self.shutdown_now={}').format(
self.is_shutting_down(shutdown_event=self.shutdown_event),
self.shutdown_now))
# end of publish_to_splunk
[docs] def queue_empty(
self,
use_queue):
"""queue_empty
:param use_queue: queue to test
"""
if hasattr(use_queue, 'empty'):
return use_queue.empty()
else:
return use_queue.qsize() == 0
# end of queue_empty
[docs] def force_flush(
self):
"""force_flush
Flush the queue and publish everything to Splunk
"""
self.debug_log('force flush - start')
self.publish_to_splunk()
self.debug_log('force flush - done')
# end of force_flush
[docs] def is_shutting_down(
self,
shutdown_event):
"""is_shutting_down
Determine if the parent is shutting down or this was
triggered to shutdown
:param shutdown_event: shutdown event
"""
return bool(
shutdown_event.is_set()
or self.shutdown_now
or self.run_once)
# end of is_shutting_down
[docs] def close(
self):
"""close"""
self.debug_log('close - start')
self.shutdown()
logging.Handler.close(self)
self.debug_log('close - done')
# end of close
[docs] def shutdown(
self):
"""shutdown"""
self.debug_log('shutdown - start')
# Only initiate shutdown once
if not self.shutdown_now:
self.debug_log('shutdown - still shutting down')
# Cancels the scheduled Timer, allows exit immediately
if self.timer:
self.timer.cancel()
self.timer = None
return
else:
self.debug_log('shutdown - start - setting instance shutdown')
self.shutdown_now = True
self.shutdown_event.set()
# if/else already shutting down
# Cancels the scheduled Timer, allows exit immediately
self.timer.cancel()
self.timer = None
self.debug_log(
'shutdown - publishing remaining logs')
if self.sleep_interval > 0:
try:
self.build_payload_from_queued_messages(
use_queue=self.queue,
shutdown_event=self.shutdown_event,
triggered_by_shutdown=True)
except Exception as e:
self.write_log((
'shutdown - failed to build a payload for remaining '
'messages in queue Exception shutting down '
'with ex={}').format(
e))
self.debug_log(
'publishing remaining logs')
# Send the remaining items in the queue
self.publish_to_splunk()
# end of try to publish remaining messages in the queue
# during shutdown
self.debug_log('shutdown - done')
# end of shutdown
# end of SplunkPublisher