SplunkStore_search.py

Used Splank module of Python

12

Votes

import splunklib.client as client
from splunklib.binding import AuthenticationError

import sys
import json

splunk_config = {
    "splunk":{
        "host": "localhost",
        "port": 8089,
        "username": "admin",
        "password": "changeme"
    },
    "index": "index_name"
}

class SplunkStore:
    def __init__(self):
        self._indexes = list()
        self._splunk_conf = splunk_config        
        self._index = self._splunk_conf["index"]
        self._connect()
        self.create()

    #fucntion connects to the splunks and fetches list of all indexex
    #accessible to that user and store it in self._indexes
    def _connect(self):
        try:
            self._service = client.connect(**self._splunk_conf["splunk"])
            for index in self._service.indexes:
                self._indexes.append(index.name)
        except AuthenticationError:
            print "Unable to authenticate user on Splunk"
        print "User successfully Logged in..."

    def create(self):
        if not self._index in self._indexes:
            self._service.indexes.create(self._index)
            print "Index created"
        else:
            print "Index already exists not creating..."
        index = self._service.indexes[self._index]
        return self._index

    #used for submitting the single event to the splunk
    def submit(self, data):
        print "Submitting single event to the splunk server"
        self._service.indexes[self._index].submit(data)

    #returns the stream socket to push series of data to splunk
    def attach(self, source, sourcetype):
        return self._service.indexes[self._index].attach(
            source = "nsg.dat",
            sourcetype = "_json" 
        )
    #creates a blocking search for the search query
    #add it to the list of jobs
    def search(self, search_query):
        #get the object from the job pool
        jobs = self._service.jobs
        #dict specifies what kind of search job it has to be
        search_kwargs_params = {
            "exec_mode": "blocking"
        }
        job = jobs.create(search_query, **search_kwargs_params)
        for result in results.ResultsReader(job.results()):
            yield json.dumps(result)


def main():
    ss = SplunkStore()
    with open("./data/dummy.dat") as f:
        content = json.load(f)

    splunk_socket = ss.attach(
        source = "my_local",
        sourcetype = "_json"
    )

    for json_object in content:
        splunk_socket.write(json.dumps(json_object))  

    for res in ss.search(
        'search index="cloudbit" source="nsg.dat"'
    ):
        print res


if __name__ == "__main__":
    main()

Vote Here

You must earn at least 1 vote on your snippets to be allowed to vote

Terms Of Use

Privacy Policy

Featured snippets are MIT license

Gears & Masters

Advertise

DevOpsnipp.com © 2020

medium.png