Source code for spylunking.scripts.search_splunk

#!/usr/bin/env python

"""
A tool for searching splunk with python - spylunking

Examples
--------

Please use these environment variables to publish logs and run searches
with a local or remote splunk server:

::

    export SPLUNK_ADDRESS="splunkenterprise:8088"
    export SPLUNK_API_ADDRESS="splunkenterprise:8089"
    export SPLUNK_PASSWORD="123321"
    export SPLUNK_USER="trex"
    export SPLUNK_TOKEN="<Optional pre-existing Splunk token>"
    export SPLUNK_INDEX="<splunk index>"

Pull Logs with a Query on the Command Line
==========================================

::

    sp -q 'index="antinex" AND levelname=INFO | head 10 | reverse' \
        -u trex -p 123321 -a splunkenterprise:8089

Pull Logs with a Query on the Command Line
==========================================

Get CRITICAL logs
=================

::

    sp -q 'index="antinex" AND levelname="CRITICAL" | reverse'

Get First 10 ERROR logs
=======================

::

    sp -q 'index="antinex" AND levelname="ERROR" | head 10 | reverse' \
        -u trex -p 123321 -a splunkenterprise:8089

"""

import sys
import json
import argparse
import datetime
from spylunking.log.setup_logging import \
    simple_logger
import spylunking.search as sp
from spylunking.ev import ev
from spylunking.ppj import ppj
from spylunking.consts import SUCCESS
from spylunking.consts import SPLUNK_USER
from spylunking.consts import SPLUNK_PASSWORD
from spylunking.consts import SPLUNK_TOKEN
from spylunking.consts import SPLUNK_API_ADDRESS
from spylunking.consts import SPLUNK_INDEX
from spylunking.consts import SPLUNK_VERBOSE


log = simple_logger()

"""
Additional optional argument values for simple_logger
=====================================================

::

    # dates on all logs
    handler_name=None
    # same as None, shows dates on all logs
    handler_name=colors
    # debug authentication issues during
    # functional integration testing
    splunk_debug=True
    splunk_user=os.getenv(
        'SPLUNK_USER',
        None)
    splunk_password=os.getenv(
        'SPLUNK_PASSWORD',
        None)
    splunk_address=os.getenv(
        'SPLUNK_API_ADDRESS',
        'localhost:8089')
    splunk_token=os.getenv(
        'SPLUNK_TOKEN',
        None)
"""


def show_search_results(
        log_rec,
        code_view=True,
        json_view=False,
        show_message_details=False):
    """show_search_results

    Show search results like rsyslog or as pretty-printed
    JSON dictionaries per log for debugging drill-down fields

    :param log_rec: log record from splunk
    :param code_view: show as a normal tail -f <log file> view
    :param json_view: pretty print each log's dictionary
    :param show_message_details
    """

    log_dict = None
    try:
        log_dict = json.loads(
            log_rec)
    except Exception as e:
        log.error((
            'Failed logging record={} with ex={}').format(
                log_rec,
                e))
        return
    # end of try/ex

    if not log_dict:
        log.error((
            'Failed to parse log_rec={} as a dictionary').format(
                log_rec))
        return

    if code_view:
        comp_name = log_dict.get(
            'name',
            '')
        logger_name = log_dict.get(
            'logger_name',
            '')
        use_log_name = (
            '{}').format(
            logger_name)
        if logger_name:
            use_log_name = '{}'.format(
                logger_name)
        else:
            if comp_name:
                use_log_name = '{}'.format(
                    comp_name)

        prefix_log = (
            '{} {} - {} -').format(
                log_dict.get(
                    'systime',
                    log_dict.get(
                        'asctime',
                        '')),
                use_log_name,
                log_dict.get(
                    'levelname',
                    ''))

        suffix_log = ''
        if log_dict.get(
                'exc',
                ''):
            suffix_log = (
                '{} exc={}').format(
                    suffix_log,
                    log_dict.get(
                        'exc',
                        ''))

        if show_message_details:
            suffix_log = (
                'dc={} env={} '
                'source={} line={}').format(
                    log_dict.get(
                        'dc',
                        ''),
                    log_dict.get(
                        'env',
                        ''),
                    log_dict.get(
                        'path',
                        ''),
                    log_dict.get(
                        'lineno',
                        ''))

        msg = (
            '{} {} {}').format(
                prefix_log,
                log_dict.get(
                    'message',
                    ''),
                suffix_log)

        if log_dict['levelname'] == 'INFO':
            log.info((
                '{}').format(
                    msg))
        elif log_dict['levelname'] == 'DEBUG':
            log.debug((
                '{}').format(
                    msg))
        elif log_dict['levelname'] == 'ERROR':
            log.error((
                '{}').format(
                    msg))
        elif log_dict['levelname'] == 'CRITICAL':
            log.critical((
                '{}').format(
                    msg))
        elif log_dict['levelname'] == 'WARNING':
            log.warning((
                '{}').format(
                    msg))
        else:
            log.debug((
                '{}').format(
                    msg))
    elif json_view:
        if log_dict['levelname'] == 'INFO':
            log.info((
                '{}').format(
                    ppj(log_dict)))
        elif log_dict['levelname'] == 'DEBUG':
            log.debug((
                '{}').format(
                    ppj(log_dict)))
        elif log_dict['levelname'] == 'ERROR':
            log.error((
                '{}').format(
                    ppj(log_dict)))
        elif log_dict['levelname'] == 'CRITICAL':
            log.critical((
                '{}').format(
                    ppj(log_dict)))
        elif log_dict['levelname'] == 'WARNING':
            log.warning((
                '{}').format(
                    ppj(log_dict)))
        else:
            log.debug((
                '{}').format(
                    ppj(log_dict)))
    else:
        log.error((
            'Please use either code_view or json_view to view the logs'))
    # end of handling different log view presentation types

# end of show_search_results


def show_non_search_results(
        log_rec,
        code_view=True,
        json_view=False,
        show_message_details=False):
    """show_non_search_results

    Show non-search results for search jobs like:
    ``index="antinex" | stats count``

    :param log_rec: log record from splunk
    :param code_view: show as a normal tail -f <log file> view
    :param json_view: pretty print each log's dictionary
    :param show_message_details
    """

    log_dict = None
    try:
        log_dict = json.loads(
            log_rec)
    except Exception as e:
        log_dict = None
    # end of try/ex

    if not log_dict:
        log.info((
            '{}').format(
                ppj(log_rec)))
    else:
        log.info((
            '{}').format(
                ppj(log_dict)))
# end of show_non_search_results


[docs]def run_main(): """run_main Search Splunk """ parser = argparse.ArgumentParser( description=( 'Search Splunk')) parser.add_argument( '-u', help='username', required=False, dest='user') parser.add_argument( '-p', help='user password', required=False, dest='password') parser.add_argument( '-f', help='splunk-ready request in a json file', required=False, dest='datafile') parser.add_argument( '-i', help='index to search', required=False, dest='index_name') parser.add_argument( '-a', help='host address: <fqdn:port>', required=False, dest='address') parser.add_argument( '-e', help='(Optional) earliest_time minutes back', required=False, dest='earliest_time_minutes') parser.add_argument( '-l', help='(Optional) latest_time minutes back', required=False, dest='latest_time_minutes') parser.add_argument( '-q', '--queryargs', nargs='*', help=( 'query string for searching splunk: ' 'search index="antinex" AND levelname="ERROR"'), required=False, dest='query_args') parser.add_argument( '-j', help='(Optional) view as json dictionary logs', required=False, dest='json_view', action='store_true') parser.add_argument( '-t', help=( '(Optional) pre-existing Splunk token ' 'which can be set using export ' 'SPLUNK_TOKEN=<token> - if provided ' 'the user (-u) and password (-p) ' 'arguments are not required'), required=False, dest='token') parser.add_argument( '-m', help='(Optional) verbose message when getting logs', required=False, dest='message_details', action='store_true') parser.add_argument( '-v', help='(Optional) verify certs - disabled by default', required=False, dest='verify', action='store_true') parser.add_argument( '-b', help='verbose', required=False, dest='verbose', action='store_true') args = parser.parse_args() user = SPLUNK_USER password = SPLUNK_PASSWORD token = SPLUNK_TOKEN address = SPLUNK_API_ADDRESS index_name = SPLUNK_INDEX verbose = SPLUNK_VERBOSE show_message_details = bool(str(ev( 'MESSAGE_DETAILS', '0')).lower() == '1') earliest_time_minutes = None latest_time_minutes = None verify = False code_view = True json_view = False datafile = None if args.user: user = args.user if args.password: password = args.password if args.address: address = args.address if args.datafile: datafile = args.datafile if args.index_name: index_name = args.index_name if args.verify: verify = args.verify if args.earliest_time_minutes: earliest_time_minutes = int(args.earliest_time_minutes) if args.latest_time_minutes: latest_time_minutes = int(args.latest_time_minutes) if args.verbose: verbose = True if args.message_details: show_message_details = args.message_details if args.token: token = args.token if args.json_view: json_view = True code_view = False default_search_query = 'index="{}" | head 10 | reverse'.format( index_name) search_query = ev( 'SPLUNK_QUERY', default_search_query) if args.query_args: search_query = ' '.join( args.query_args) valid = True if not user or user == 'user-not-set': log.critical('missing user') valid = False if not password or password == 'password-not-set': log.critical('missing password') valid = False if not index_name: log.critical('missing splunk index') valid = False if token: # if the token is present, # then the user and the password are not required if not valid and index_name: valid = True if not valid: log.critical( 'Please run with the following arguments:\n') log.error( '-u <username> -p <password> ' '-i <index> -t <token if user and password not set> ' '-a <host address as: fqdn:port>') log.critical( '\n' 'Or you can export the following ' 'environment variables and retry the command: ' '\n') log.error( 'export SPLUNK_ADDRESS="splunkenterprise:8088"\n' 'export SPLUNK_API_ADDRESS="splunkenterprise:8089"\n' 'export SPLUNK_PASSWORD="123321"\n' 'export SPLUNK_USER="trex"\n' 'export SPLUNK_INDEX="antinex"\n' 'export SPLUNK_TOKEN="<Optional pre-existing Splunk token>"\n') sys.exit(1) if verbose: log.info(( 'creating client user={} address={}').format( user, address)) last_msg = '' host = '' port = -1 try: last_msg = ( 'Invalid address={}').format( address) address_split = address.split(':') last_msg = ( 'Failed finding host in address={} ' '- please use: -a <fqdn:port>').format( address) host = address_split[0] last_msg = ( 'Failed finding integer port in address={} ' '- please use: -a <fqdn:port>').format( address) port = int(address_split[1]) except Exception as e: log.error(( 'Failed to parse -a {} for the ' 'splunk host address: {} which threw an ' 'ex={}').format( address, last_msg, e)) sys.exit(1) # end of try ex if verbose: log.info(( 'connecting {}@{}:{}').format( user, host, port)) req_body = None if datafile: if verbose: log.info(( 'loading request in datafile={}').format( datafile)) with open(datafile, 'r') as f: req_body = json.loads(f.read()) earliest_time = None latest_time = None now = datetime.datetime.now() if earliest_time_minutes: min_15_ago = now - datetime.timedelta( minutes=earliest_time_minutes) earliest_time = min_15_ago.strftime( '%Y-%m-%dT%H:%M:%S.000-00:00') if latest_time_minutes: latest_time = (now - datetime.timedelta( minutes=latest_time_minutes)).strftime( '%Y-%m-%dT%H:%M:%S.000-00:00') # Step 2: Create a search job if not search_query.startswith('search'): search_query = 'search {}'.format( search_query) search_data = req_body if not search_data: search_data = { 'search': search_query } if earliest_time: search_data['earliest_time'] = earliest_time if latest_time: search_data['latest_time'] = latest_time res = sp.search( user=user, password=password, address=address, token=token, query_dict=search_data, verify=verify) if res['status'] == SUCCESS: result_list = [] try: result_list = res['record'].get( 'results', result_list) if len(result_list) == 0: log.info(( 'No matches for search={} ' 'response={}').format( ppj(search_data), ppj(res['record']))) except Exception as e: result_list = [] log.error(( 'Failed to find results for the query={} ' 'with ex={}').format( ppj(search_data), e)) for ridx, log_record in enumerate(result_list): log_raw = log_record.get( '_raw', None) if log_raw: show_search_results( log_rec=log_raw, code_view=code_view, json_view=json_view, show_message_details=show_message_details) else: show_non_search_results( log_rec=log_record, code_view=code_view, json_view=json_view, show_message_details=show_message_details) # end of handling log record presentation as a view # end for all log records else: log.error(( 'Failed searching splunk with status={} and ' 'error: {}').format( res['status'], res['err'])) # end of if job_id if verbose: log.info('done')
# end of run_main if __name__ == '__main__': run_main()