ifxlookup/ifxlookup.py

318 lines
12 KiB
Python
Raw Normal View History

#!bin/python3
# ifxlookup, A python command-line client for infrastructure equipment.
2019-11-25 18:16:03 +00:00
# November 2019 by Daniel Dayley
import os
import sys
import requests
import argparse
import ipaddress
import jsonpath_ng
import socket
import inspect
import yaml
import json
import re
import importlib
yaml.Dumper.ignore_aliases = lambda *args : True
class IFXLookup():
DEFAULT_CONFIG = '~/.config/ifxlookup.yml'
_plugin_map = {}
_delegate_map = {}
_debug_level = 0
_config = {}
_caching = False
_initialized_plugins = []
def __init__(self,config=None) :
if config :
self.load_config(config=config)
pass
def __add_plugin(self,plugin,reload) :
2020-05-22 18:28:13 +00:00
"""Adds a given ServiceBase plugin and instance, reinitializing one if it already exists and such is specified."""
plugin_name = plugin.__module__.split('.')[-1]
if not reload and plugin_name in self._plugin_map.keys():
pass
else :
# We can't startup the plugin here because it hasn't been configured. We'll handle that at runtime.
try:
# Remove any intialized objects of the same name, forcing a reinitialization
self._initialized_plugins.remove(self._delegate_map[plugin_name])
except:
pass
self._plugin_map.update({plugin_name:plugin})
self._delegate_map.update({plugin_name:plugin()})
2020-05-22 18:28:13 +00:00
def __init_once(self,instance) :
"""Runs the startup function on the instance once."""
if instance not in self._initialized_plugins :
instance.startup()
self._initialized_plugins.append(instance)
def __shutdown_plugins(self) :
"""Shutdown all plugins."""
2020-05-22 18:28:13 +00:00
for instance in self._delegate_map.values() :
instance.shutdown()
def __namespace(self) :
"""Generate map of ifxlookup runtime values that should be made available to plugins."""
return {'debug' : self._debug_level,'caching': self._caching}
def __filter_and_return(self,subjects,dataset,filter) :
2020-05-22 18:28:13 +00:00
"""Returns the submitted dataset, applying the given filter."""
final_result = {}
if 'error' in dataset or 'warn' in dataset :
final_result['status'] = {}
if 'error' in dataset :
final_result['status']['error'] = dataset['error']
if 'warn' in dataset :
final_result['status']['warn'] = dataset['warn']
for subject in subjects :
2019-12-04 19:00:50 +00:00
subjectdataset = {}
for plugin in [x for x in dataset if x != 'error' and x != 'warn'] :
2019-12-04 19:00:50 +00:00
subjectdataset.update({plugin:dataset[plugin][subject]})
final_result.update({subject:subjectdataset})
# Apply Filter
if filter :
jsonpath_filter = jsonpath_ng.parse(filter)
final_result = [x.value for x in jsonpath_filter.find(final_result)]
return final_result
def use_plugins(self,plugins,reload=False) :
"""Defines plugins that should be used in a lookup, optionally forcing them to reset."""
# Verify data
if type(plugins) is not list :
raise ValueError('argument \'plugins\' should be of type list')
for plugin in plugins :
2020-05-22 18:28:13 +00:00
# Check if the plugin is a string or a descendent of a ServiceBase class
if type(plugin) is not str and 'ServiceBase' not in [x.__name__ for x in inspect.getmro(plugin)] :
raise ValueError('unkown type for plugin')
# Find plugins by name using a default path
if type(plugin) is str :
2020-05-22 18:28:13 +00:00
available_plugins = [y for x,y in self.search_plugins().items() if x == plugin and 'ServiceBase' in [z.__name__ for z in inspect.getmro(y)]]
if len(available_plugins) is 0 :
raise FileNotFoundError(plugin + '.py not found')
2020-05-22 18:28:13 +00:00
plugin = available_plugins[0]
if 'ServiceBase' in [x.__name__ for x in inspect.getmro(plugin)] :
self.__add_plugin(plugin,reload)
continue
return self
def get_plugins(self) :
"""Returns a map of plugins configured to use in the lookup."""
return self._delegate_map
2020-05-26 18:26:01 +00:00
def search_plugins(self,directory=None) :
"""Searches a given directory for compatible plugins and returns a map of available plugin names and classes"""
2020-05-26 18:26:01 +00:00
if not directory :
directory = '/'.join(os.path.realpath(__file__).split('/')[:-1]) + '/' + 'plugins'
directory = os.path.normpath(os.path.expanduser(os.path.expandvars(directory)))
name_map = {}
candidates = {x.split('.')[0]:x for x in os.listdir(directory) if x.endswith('.py') and x != 'servicebase.py'}
for name,filename in candidates.items() :
spec = importlib.util.spec_from_file_location(name, directory + '/' + filename)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
instance = getattr(mod,'ServiceDelegate')
name_map.update({filename.split('.')[0]:instance})
return name_map
def configure(self,config) :
"""Updates the configuration of the lookup with the given config."""
if type(config) is not dict :
raise TypeError('argument \'config\' should be of type dict')
self._config.update(config)
def dump_config(self) :
"""Returns the current configuration state. This likely contains sensitive information such as API keys."""
return self._config
def dump_config_to_file(self,path,print_json=False) :
"""Exports the current plugin configuration to a file."""
with open(os.path.normpath(os.path.expanduser(os.path.expandvars(path))),'a') as file_handle :
if print_json :
json.dump(self._config, file_handle)
else :
yaml.dump(self._config, file_handle, default_flow_style=False)
def set_debug_level(self,level) :
"""Sets the debug level"""
2020-05-22 18:28:13 +00:00
try:
level = int(level)
except Exception:
level = 0
2020-05-22 18:28:13 +00:00
self._debug_level = level
return self
def debug_level(self) :
"""Gets the debug level"""
return this._debug_level
def enable_caching(self) :
"""Enables results caching and disables auto-shutdown of plugins."""
self._caching = True
return self
def disable_caching(self) :
"""Disables results caching and enables auto-shutdown of plugins."""
self._caching = False
return self
def lookup(self,subjects,filter=None) :
"""Performs a search with the configured plugins and filter and returns a dictionary of search data."""
hints = {}
final_report = {}
error_info = {}
warn_info = {}
# Assert that each plugin has a config
for delegatename,delegate in self._delegate_map.items() :
if not delegatename in self._config.keys() :
raise KeyError('no configuration provided for plugin ' + delegatename)
# Assert that 'subjects' is a list
if type(subjects) is not list :
raise TypeError('expected list but got ' + str(type(subjects)))
# Assert that subjects are strings
for subject in subjects :
if type(subject) is not str :
raise TypeError('subjects must be of type \'str\'')
for delegatename,delegate in self._delegate_map.items() :
# Add runtime info from ifxlookup
delegateconfig = self._config[delegatename]
delegateconfig.update({'_namespace':self.__namespace()})
# Configure and start the plugin
delegate._set_config(delegateconfig)
delegate._set_hints(hints)
# Only runs once
self.__init_once(delegate)
report = {}
for subject in subjects :
report.update({subject:delegate.lookup(subject)})
2020-05-26 18:26:01 +00:00
if delegate._error != [] :
error_info.update({delegatename:delegate._error})
if delegate._warn != [] :
warn_info.update({delegatename:delegate._warn})
report.update(error_info)
report.update(warn_info)
if report and report != {}:
final_report.update({delegatename:report})
2020-05-26 18:26:01 +00:00
if len(error_info) > 0 :
final_report.update({'error':error_info})
if len(warn_info) > 0 :
final_report.update({'warning':warn_info})
# If caching is enabled prevent shutdown. Otherwise shutdown.
if not self._caching :
self.__shutdown_plugins()
return self.__filter_and_return(subjects,final_report,filter)
def finish(self) :
"""In the case that caching has been enabled, this enables the plugins to be manually shut down."""
self.__shutdown_plugins()
if __name__ == '__main__':
# Command-line client
search = IFXLookup()
available_plugins = search.search_plugins()
# Define constants
2020-05-22 18:28:13 +00:00
config_dir = os.path.expanduser('~/.config')
config_filename = 'ifxlookup.yml'
service_template = {'hosts':[],'username':'','key':''}
config_template = {'bluecat':service_template,'f5':service_template,'paloalto':service_template,'aruba':service_template,'openvpn':service_template}
config_error = {'config': 'There was a problem reading your config file, aborting.'}
error_info = {}
# Gather Argument options
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', action='store', help='Specify a config file (~/.config/ifxlookup.yml by default)')
parser.add_argument('-f', '--filter', action='store', default=None, help='Apply a JSONPath filter to the results')
parser.add_argument('-j', '--json', action='store_true', help='Return results as a json object')
2020-05-22 18:28:13 +00:00
# parser.add_argument('-l', '--link', action='store_true', help='Return physical link information about the subject')
for delegate in available_plugins.values() :
arguments = delegate().get_arguments()
if arguments and len(arguments) == 4 :
parser.add_argument(arguments[0],arguments[1],action=arguments[2],help=arguments[3])
parser.add_argument('-v', '--debug', action='count', help='Include debug information in the output. Add \'v\'s for more output.')
parser.add_argument('-a', '--all', action='store_true', help='Return all searchable information about the subject')
parser.add_argument('subjects', metavar='subjects', nargs='+', help='IP\'s, hostnames, MAC\'s, usernames, or other strings to look up.')
args = parser.parse_args()
# Load plugins based on submitted args
selected_plugins = []
if vars(args)['all'] :
for delegate in available_plugins.values() :
selected_plugins.append(delegate)
else :
for name,delegate in available_plugins.items() :
argument = delegate().get_arguments()[1].split('--')[-1]
if argument in vars(args) and vars(args)[argument] :
selected_plugins.append(delegate)
search.use_plugins(selected_plugins)
# Configure subjects
subjects = []
for item in args.subjects :
subjects.append(item)
# Load configuration
# The command line configuration accepts YAML docs with required keys and values. The command line tool can generate them if
# they do not exist.
if args.config :
# TODO: this section throws a yaml error if a json doc can't be found, errors should be more specific.
try:
config = config=os.path.normpath(args.config)
except Exception as exception :
2020-05-22 18:28:13 +00:00
error_info.update(config_error)
else :
try:
2020-05-22 18:28:13 +00:00
config = config=os.path.normpath(config_dir + '/' + config_filename)
except FileNotFoundError as exception :
2020-05-22 18:28:13 +00:00
if not os.path.exists(os.path.normpath(config_dir)):
os.makedirs(os.path.normpath(config_dir))
if not os.path.exists(os.path.normpath(config_dir + '/' + config_filename)):
with open(config_dir + '/' + config_filename, 'w') as output :
yaml.dump(config_template, output)
error_info.update({'config': 'Config file ' + config_dir + '/' + config_filename + ' is empty, populating with a template.'})
else :
2020-05-22 18:28:13 +00:00
error_info.update(config_error)
except Exception as exception :
2020-05-22 18:28:13 +00:00
error_info.update(config_error)
if not config :
2020-05-22 18:28:13 +00:00
with open(config_dir + '/' + config_filename, 'w') as output :
yaml.dump(config_template, output)
error_info.update({'config': 'Config file ' + config_dir + '/' + config_filename + ' is empty, populating with a template.'})
# Stop on initialization error
if not config :
2020-05-22 18:28:13 +00:00
error_info.update(config_error)
try :
file_descriptor = open(os.path.normpath(os.path.expanduser(os.path.expandvars(config))))
2020-05-22 18:28:13 +00:00
total_config = json.load(file_descriptor)
except json.decoder.JSONDecodeError as exception :
file_descriptor = open(os.path.normpath(os.path.expanduser(os.path.expandvars(config))))
2020-05-22 18:28:13 +00:00
total_config = yaml.safe_load(file_descriptor)
for plugin_name in search.get_plugins().keys() :
2020-05-22 18:28:13 +00:00
if plugin_name not in total_config.keys() :
error_info.update({'config': 'Config file is missing information for plugin ' + plugin_name})
if error_info != {} :
print(subjects,{'error':error_info})
exit(1)
2020-05-22 18:28:13 +00:00
search.configure(total_config)
search.set_debug_level(args.debug)
2020-05-22 18:28:13 +00:00
# Run search
report = search.lookup(args.subjects,filter=args.filter)
if args.json :
print(json.dumps(report))
else :
yaml.dump(report,sys.stdout)