ifxlookup/ifxtool

169 lines
6.3 KiB
Plaintext
Raw Normal View History

2019-11-25 18:16:03 +00:00
#!/usr/local/bin/python3
# ifxtool, A python command-line client for infrastructure equipment.
2019-11-25 18:16:03 +00:00
# November 2019 by Daniel Dayley
import os
import sys
import requests
import argparse
import ipaddress
import socket
import yaml
import json
import re
import importlib
yaml.Dumper.ignore_aliases = lambda *args : True
class IFXTool():
configdir = os.path.expanduser('~/.config')
configfilename = 'ifxtool.yml'
servicetemplate = {'hosts':[],'username':'','key':''}
configtemplate = {'bluecat':servicetemplate,'f5':servicetemplate,'paloalto':servicetemplate,'aruba':servicetemplate,'openvpn':servicetemplate}
errorinfo = []
warninfo = []
namespace = ''
servicedelegates = []
totalconfig = {}
_internal_messages = {}
2019-12-04 19:00:50 +00:00
_subjecttuples = []
_dnsmaps = {}
def __init__(self,delegates,arglist) :
self.servicedelegates = delegates
self.namespace = arglist
# Load configuration
if arglist.config :
try:
self.totalconfig = yaml.safe_load(open(os.path.normpath(arglist.config)))
except Exception as exception :
#self.errorinfo = str(exception.__class__.__name__) + ': ' + str(exception)
self.errorinfo.append("There was a problem reading your config file, aborting.")
else :
try:
self.totalconfig = yaml.safe_load(open(self.configdir + '/' + self.configfilename))
except FileNotFoundError as exception :
if not os.path.exists(os.path.normpath(self.configdir)):
os.makedirs(os.path.normpath(self.configdir))
if not os.path.exists(os.path.normpath(self.configdir + '/' + self.configfilename)):
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.errorinfo.append('Config file ' + self.configdir + '/' + self.configfilename + ' is empty, aborting.')
else :
self.errorinfo.append("There was a problem reading your config file, aborting.")
if self.totalconfig == {} :
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.errorinfo.append('Config file ' + self.configdir + '/' + self.configfilename + ' is empty, populating with a template.')
def dispatch(self) :
# Resolve subject names
for item in self.namespace.subjects :
try:
quicklookup = ipaddress.ip_address(item)
subjecttuple = socket.gethostbyname_ex(item)
# Can't make an IP out of item
except ValueError :
try :
# Lookup with system
subjecttuple = socket.gethostbyname_ex(item)
except socket.gaierror :
# Not a valid host or IP
self.warninfo.append("Unable to resolve " + item)
#self.namespace.subjects.remove(item)
#continue
subjecttuple = [item,[],['']]
# Can't make IPv4 out of it
except socket.gaierror :
subjecttuple = [ipaddress.ip_address(item).reverse_pointer,[],[str(item)]]
# Convert mutable types to immutable (to be used as dictionary keys)
immutablesubjecttuple = tuple([subjecttuple[0],tuple(subjecttuple[1]),tuple(subjecttuple[2])])
2019-12-04 19:00:50 +00:00
self._dnsmaps.update({immutablesubjecttuple:item})
self._subjecttuples.append(immutablesubjecttuple)
# Stop on initialization error
if self.errorinfo != [] :
self._internal_messages['error'] = self.errorinfo
exit(1)
# Dispatch service delegates
delegatereports = {}
for delegate in self.servicedelegates :
delegatename = delegate.__module__.split('plugins.')[1]
if delegatename in self.totalconfig.keys() :
delegateconfig = self.totalconfig[delegatename]
else :
delegateconfig = {}
2019-12-04 19:00:50 +00:00
delegateinstance = delegate(delegateconfig,self.namespace,self._subjecttuples,delegatereports)
#print(delegatename + ": " + delegateinstance._status)
delegateresponse = delegateinstance._lookup_subjects()
if delegateresponse:
delegatereports.update({delegatename:delegateresponse})
# Format and display results
if self.warninfo != [] :
self._internal_messages['warn'] = self.warninfo
delegatereports.update(self._internal_messages)
self.print_results(delegatereports)
def get_dc_from_hostname(self,hostname) :
"""Given a hostname, return the datacenter the host resides in."""
dcmatch = re.search('^[a-zA-Z]*-([a-zA-Z0-9]*)(-.*)*$',hostname)
if dcmatch :
return dcmatch.group(1)
else :
return ''
def print_results(self,dataset) :
2019-12-04 19:00:50 +00:00
"""Returns the submitted dataset"""
finalresult = {}
if 'error' in dataset or 'warn' in dataset :
finalresult['status'] = {}
if 'error' in dataset :
finalresult['status']['error'] = dataset['error']
if 'warn' in dataset :
finalresult['status']['warn'] = dataset['warn']
2019-12-04 19:00:50 +00:00
for subject in self._subjecttuples :
subjectname = self._dnsmaps[subject]
subjectdataset = {}
for plugin in [x for x in dataset if x != 'error' and x != 'warn'] :
2019-12-04 19:00:50 +00:00
subjectdataset.update({plugin:dataset[plugin][subject]})
finalresult.update({subjectname:subjectdataset})
if self.namespace.json :
2019-12-04 19:00:50 +00:00
print(json.dumps(finalresult))
else :
2019-12-04 19:00:50 +00:00
yaml.dump(finalresult,sys.stdout)
if __name__ == "__main__":
# Load Plugins
plugindir="plugins"
pluginfiles = [x for x in os.listdir(plugindir) if x != "servicebase.py" and x.endswith('.py')]
pluginfiles = sorted(pluginfiles)
importlib.import_module(plugindir)
delegates = []
for pluginfile in pluginfiles :
delegates.append(getattr(importlib.import_module(plugindir+"."+pluginfile.split('.')[0]),"ServiceDelegate"))
# Gather Argument options
parser = argparse.ArgumentParser()
2019-12-05 16:52:32 +00:00
parser.add_argument('-c', '--config', action='store', help="Specify a config file (~/.config/ifxtool.yml by default)")
parser.add_argument('-j', '--json', action='store_true', help="Return results as a json object")
parser.add_argument('-l', '--link', action='store_true', help="Return physical link information about the subject")
for delegate in delegates :
2019-12-04 19:00:50 +00:00
arguments = delegate.get_arguments(delegate)
if arguments and len(arguments) == 4 :
parser.add_argument(arguments[0],arguments[1],action=arguments[2],help=arguments[3])
parser.add_argument('-a', '--all', action='store_true', help="Return all searchable information about the subject")
parser.add_argument('-v', '--debug', action='store_true', help="Include debug information in the output")
parser.add_argument('subjects', metavar='subjects', nargs='+', help='IPs or hostnames to look up.')
args = parser.parse_args()
# Dispatch master object
main = IFXTool(delegates,args)
main.dispatch()