ifxlookup/ifxtool

166 lines
6.1 KiB
Python
Executable File

#!/usr/local/bin/python3
# ifxtool, A python command-line client for infrastructure equipment.
# November 2019 by Daniel Dayley
import os
import sys
import requests
import argparse
import ipaddress
import socket
import yaml
import json
import re
import importlib
yaml.Dumper.ignore_aliases = lambda *args : True
class IFXTool():
configdir = os.path.expanduser('~/.config')
configfilename = 'ifxtool.yml'
servicetemplate = {'hosts':[],'username':'','key':''}
configtemplate = {'bluecat':servicetemplate,'f5':servicetemplate,'paloalto':servicetemplate,'aruba':servicetemplate,'openvpn':servicetemplate}
errorinfo = []
warninfo = []
namespace = ''
servicedelegates = []
totalconfig = {}
_internal_messages = {}
_subjecttuples = []
_dnsmaps = {}
def __init__(self,delegates,arglist) :
self.servicedelegates = delegates
self.namespace = arglist
# Load configuration
if arglist.config :
try:
self.totalconfig = yaml.safe_load(open(os.path.normpath(arglist.config)))
except Exception as exception :
#self.errorinfo = str(exception.__class__.__name__) + ': ' + str(exception)
self.errorinfo.append("There was a problem reading your config file, aborting.")
else :
try:
self.totalconfig = yaml.safe_load(open(self.configdir + '/' + self.configfilename))
except FileNotFoundError as exception :
if not os.path.exists(os.path.normpath(self.configdir)):
os.makedirs(os.path.normpath(self.configdir))
if not os.path.exists(os.path.normpath(self.configdir + '/' + self.configfilename)):
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.errorinfo.append('Config file ' + self.configdir + '/' + self.configfilename + ' is empty, aborting.')
else :
self.errorinfo.append("There was a problem reading your config file, aborting.")
if self.totalconfig == {} :
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.errorinfo.append('Config file ' + self.configdir + '/' + self.configfilename + ' is empty, populating with a template.')
def dispatch(self) :
# Resolve subject names
for item in self.namespace.subjects :
try:
quicklookup = socket.gethostbyname(item)
if quicklookup != item:
subjecttuple = socket.gethostbyname_ex(item)
else :
subjecttuple = socket.gethostbyaddr(item)
except (socket.gaierror,socket.herror) :
self.warninfo.append("Unable to resolve " + item)
self.namespace.subjects.remove(item)
continue
# Convert mutable types to immutable (to be used as dictionary keys)
immutablesubjecttuple = tuple([subjecttuple[0],tuple(subjecttuple[1]),tuple(subjecttuple[2])])
self._dnsmaps.update({immutablesubjecttuple:item})
self._subjecttuples.append(immutablesubjecttuple)
# Stop on initialization error
if self.errorinfo != [] :
self._internal_messages['error'] = self.errorinfo
exit(1)
# Dispatch service delegates
delegatereports = {}
for delegate in self.servicedelegates :
delegatename = delegate.__module__.split('plugins.')[1]
if delegatename in self.totalconfig.keys() :
delegateconfig = self.totalconfig[delegatename]
else :
delegateconfig = {}
delegateinstance = delegate(delegateconfig,self.namespace,self._subjecttuples,delegatereports)
#print(delegatename + ": " + delegateinstance._status)
delegateresponse = delegateinstance._lookup_subjects()
if delegateresponse:
delegatereports.update({delegatename:delegateresponse})
# Format and display results
if self.warninfo != [] :
self._internal_messages['warn'] = self.warninfo
delegatereports.update(self._internal_messages)
self.print_results(delegatereports)
def get_dc_from_hostname(self,hostname) :
"""Given a hostname, return the datacenter the host resides in."""
dcmatch = re.search('^[a-zA-Z]*-([a-zA-Z0-9]*)(-.*)*$',hostname)
if dcmatch :
return dcmatch.group(1)
else :
return ''
def get_dc_from_ip(self,ip) :
"""Given an IP, return the datacenter the IP resides in."""
pass
def print_results(self,dataset) :
"""Returns the submitted dataset"""
finalresult = {}
if 'error' in dataset or 'warn' in dataset :
finalresult['status'] = {}
if 'error' in dataset :
finalresult['status']['error'] = dataset['error']
if 'warn' in dataset :
finalresult['status']['warn'] = dataset['warn']
for subject in self._subjecttuples :
subjectname = self._dnsmaps[subject]
subjectdataset = {}
# for plugin in [x for x in dataset if x != 'status'] :
for plugin in [x for x in dataset if x != 'error' and x != 'warn'] :
subjectdataset.update({plugin:dataset[plugin][subject]})
finalresult.update({subjectname:subjectdataset})
if self.namespace.json :
print(json.dumps(finalresult))
else :
yaml.dump(finalresult,sys.stdout)
if __name__ == "__main__":
# Load Plugins
plugindir="plugins"
pluginfiles = [x for x in os.listdir(plugindir) if x != "servicebase.py" and x.endswith('.py')]
importlib.import_module(plugindir)
delegates = []
for pluginfile in pluginfiles :
delegates.append(getattr(importlib.import_module(plugindir+"."+pluginfile.split('.')[0]),"ServiceDelegate"))
# Gather Argument options
parser = argparse.ArgumentParser()
parser.add_argument('-j', '--json', action='store_true', help="Return results as a json object")
parser.add_argument('-l', '--link', action='store_true', help="Return physical link information about the subject")
parser.add_argument('-c', '--config', action='store', help="Specify a config file (~/.config/ifxtool.yml by default)")
for delegate in delegates :
arguments = delegate.get_arguments(delegate)
if arguments and len(arguments) == 4 :
parser.add_argument(arguments[0],arguments[1],action=arguments[2],help=arguments[3])
parser.add_argument('-a', '--all', action='store_true', help="Return all searchable information about the subject")
parser.add_argument('-v', '--debug', action='store_true', help="Include debug information in the output")
parser.add_argument('subjects', metavar='subjects', nargs='+', help='IPs or hostnames to look up.')
args = parser.parse_args()
# Dispatch master object
main = IFXTool(delegates,args)
main.dispatch()