Finally making some progress here. We have basic plugin loading and IP/hostname distribution. We need to be careful about not consuming too much plugin functionality in the main file.

This commit is contained in:
Daniel Dayley 2019-12-03 13:27:13 -07:00
parent 6a18f2758b
commit 73517d17d5
8 changed files with 264 additions and 1 deletions

141
ifxtool Normal file → Executable file
View File

@ -1,4 +1,143 @@
#!/usr/local/bin/python3
# ifxtool, a command-line API client for BlueCat, F5, and Palo Alto equipment.
# ifxtool, A python command-line client for infrastructure equipment.
# November 2019 by Daniel Dayley
import os
import sys
import requests
import argparse
import ipaddress
import socket
import yaml
import json
import re
import importlib
yaml.Dumper.ignore_aliases = lambda *args : True
class IFXTool():
configdir = os.path.expanduser('~/.config')
configfilename = 'ifxtool.yml'
servicetemplate = {'hosts':[],'username':'','key':''}
configtemplate = {'bluecat':servicetemplate,'f5':servicetemplate,'paloalto':servicetemplate,'aruba':servicetemplate,'openvpn':servicetemplate}
stopinfo = ''
namespace = ''
servicedelegates = []
totalconfig = {}
_internal_messages = {}
def __init__(self,delegates,arglist) :
self.servicedelegates = delegates
self.namespace = arglist
# Load configuration
if arglist.config :
try:
self.totalconfig = yaml.safe_load(open(os.path.normpath(arglist.config)))
except Exception as exception :
#self.stopinfo = str(exception.__class__.__name__) + ': ' + str(exception)
self.stopinfo = "There was a problem reading your config file, aborting."
else :
try:
self.totalconfig = yaml.safe_load(open(self.configdir + '/' + self.configfilename))
except FileNotFoundError as exception :
if not os.path.exists(os.path.normpath(self.configdir)):
os.makedirs(os.path.normpath(self.configdir))
if not os.path.exists(os.path.normpath(self.configdir + '/' + self.configfilename)):
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.stopinfo='Config file ' + self.configdir + '/' + self.configfilename + ' is empty, aborting.'
else :
self.stopinfo = "There was a problem reading your config file, aborting."
if self.totalconfig == {} :
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.stopinfo='Config file ' + self.configdir + '/' + self.configfilename + ' is empty, populating with a template.'
def dispatch(self) :
# Resolve subject names
subjecttuples = []
for item in self.namespace.subjects :
try:
quicklookup = socket.gethostbyname(item)
if quicklookup != item:
subjecttuple = socket.gethostbyname_ex(item)
else :
subjecttuple = socket.gethostbyaddr(item)
except socket.gaierror:
self.stopinfo = "Unable to resolve " + item + ", skipping."
except socket.herror:
subjecttuple = tuple(['',[],[item]])
# Convert mutable types to immutable (to be used as dictionary keys)
immutablesubjecttuple = tuple([subjecttuple[0],tuple(subjecttuple[1]),tuple(subjecttuple[2])])
subjecttuples.append(immutablesubjecttuple)
# Stop on initialization error
if self.stopinfo != '' :
self._internal_messages['Status'] = self.stopinfo
self.print_results(self._internal_messages)
exit(1)
# Dispatch service delegates
delegatereports = {}
for delegate in self.servicedelegates :
delegatename = delegate.__module__.split('plugins.')[1]
if delegatename in self.totalconfig.keys() :
delegateconfig = self.totalconfig[delegatename]
else :
delegateconfig = {}
delegateinstance = delegate(delegateconfig,self.namespace,subjecttuples,delegatereports)
print(delegatename + ": " + delegateinstance._status)
delegatereports.update({delegatename:delegateinstance._lookup_subjects()})
# Format and display results
delegatereports.update(self._internal_messages)
self.print_results(delegatereports)
def get_dc_from_hostname(self,hostname) :
"""Given a hostname, return the datacenter the host resides in."""
dcmatch = re.search('^[a-zA-Z]*-([a-zA-Z0-9]*)(-.*)*$',hostname)
if dcmatch :
return dcmatch.group(1)
else :
return ''
def get_dc_from_ip(self,ip) :
"""Given an IP, return the datacenter the IP resides in."""
pass
def print_results(self,dataset) :
"""Prints the submitted dataset to the console"""
if self.namespace.json :
print(json.dumps(dataset))
else :
yaml.dump(dataset,sys.stdout)
if __name__ == "__main__":
# Load Plugins
plugindir="plugins"
pluginfiles = [x for x in os.listdir(plugindir) if x != "servicebase.py" and x.endswith('.py')]
importlib.import_module(plugindir)
delegates = []
for pluginfile in pluginfiles :
delegates.append(getattr(importlib.import_module(plugindir+"."+pluginfile.split('.')[0]),"ServiceDelegate"))
# Gather Argument options
parser = argparse.ArgumentParser()
parser.add_argument('-j', '--json', action='store_true', help="Return results as a json object")
parser.add_argument('-l', '--link', action='store_true', help="Return physical link information about the subject")
parser.add_argument('-c', '--config', action='store', help="Specify a config file (~/.config/ifxtool.yml by default)")
for delegate in delegates :
arguments = delegate.get_arguments()
if arguments and len(arguments) == 4 :
parser.add_argument(arguments[0],arguments[1],action=arguments[2],help=arguments[3])
parser.add_argument('-a', '--all', action='store_true', help="Return all searchable information about the subject")
parser.add_argument('-v', '--debug', action='store_true', help="Include debug information in the output")
parser.add_argument('subjects', metavar='subjects', nargs='+', help='IPs or hostnames to look up.')
args = parser.parse_args()
# Dispatch master object
main = IFXTool(delegates,args)
main.dispatch()

BIN
plugins/.DS_Store vendored Normal file

Binary file not shown.

14
plugins/aruba.py Normal file
View File

@ -0,0 +1,14 @@
from servicebase import ServiceBase
class ServiceDelegate(ServiceBase) :
def get_arguments() :
"""Returns an array of information used to construct an argumentparser argument."""
# [ <short flag>,<unix flag>,<arg type>,<description> ]
# Example return: [ '-n', '--net', 'store_true', "Return network information about the subject" ]
return ['-w', '--wifi', 'store_true', "Return wireless connection information about the subject"]
def get_ap_list() :
"""Returns the access points available from the aruba controllers."""
pass

24
plugins/bluecat.py Normal file
View File

@ -0,0 +1,24 @@
from servicebase import ServiceBase
class ServiceDelegate(ServiceBase) :
def get_arguments() :
"""Returns an array of information used to construct an argumentparser argument."""
# [ <short flag>,<unix flag>,<arg type>,<description> ]
# Example return: [ '-n', '--net', 'store_true', "Return network information about the subject" ]
return ['-n', '--net', 'store_true', "Return network information about the subject"]
def get_bc_auth_header(host,username,password) :
"""Given authentication information, return the authenticated header for use in REST requests to a bluecat device."""
pass
def get_bc_parent_object_id(bc_id) :
"""Given a Bluecat Object ID, return the Object ID of the parent."""
pass
def get_bc_onject_name(bc_id) :
"""Given a Bluecat Object ID, return the name of the object."""
pass
def host_to_ip(hostname) :
"""Given a hostname, return the IP address of the hostname."""
pass

18
plugins/f5.py Normal file
View File

@ -0,0 +1,18 @@
from servicebase import ServiceBase
class ServiceDelegate(ServiceBase) :
def get_arguments() :
"""Returns an array of information used to construct an argumentparser argument."""
# [ <short flag>,<unix flag>,<arg type>,<description> ]
# Example return: [ '-n', '--net', 'store_true', "Return network information about the subject" ]
return ['-cg', '--vip', 'store_true', "Return VIP information about the subject"]
def get_f5_auth_header(host,username,password,authprovider) :
"""Given authentication information, return the authenticated header for use in REST requests to an F5 device."""
pass
def get_f5_for_dc(dc) :
"""Given a datacenter, return the next F5 device for that datacenter."""
pass
def get_vip_cg(ip) :
"""Given an IP, return all VIPs that the IP is a member of."""
pass

15
plugins/openvpn.py Normal file
View File

@ -0,0 +1,15 @@
from servicebase import ServiceBase
class ServiceDelegate(ServiceBase) :
def get_arguments() :
"""Returns an array of information used to construct an argumentparser argument."""
# [ <short flag>,<unix flag>,<arg type>,<description> ]
# Example return: [ '-n', '--net', 'store_true', "Return network information about the subject" ]
return ['-r', '--vpn','store_true',"Return VPN information about the subject"]
def get_user_from_ip(ip) :
"""Given an IP address, return the user who was last assigned the address."""
pass
def get_user_login_time_from_ip(ip) :
"""Given an IP address, return the time of the last assignment of the address"""
pass

15
plugins/paloalto.py Normal file
View File

@ -0,0 +1,15 @@
from servicebase import ServiceBase
class ServiceDelegate(ServiceBase) :
def get_arguments() :
"""Returns an array of information used to construct an argumentparser argument."""
# [ <short flag>,<unix flag>,<arg type>,<description> ]
# Example return: [ '-n', '--net', 'store_true', "Return network information about the subject" ]
return ['-fw', '--rules', 'store_true', "Return Firewall rules relating to the subject"]
def get_pa_auth_header(host,username,password) :
"""Given authentication information, return the authenticated header for use in REST requests to a Palo Alto device."""
pass
def get_pa_for_dc(dc) :
"""Given a datacenter, return the next Palo Alto device for that datacenter."""
pass

38
servicebase.py Normal file
View File

@ -0,0 +1,38 @@
class ServiceBase :
config = {}
namespace = {}
data = {}
_status = "Ready"
_subjects = []
def __init__(self,config,namespace,subjects,dossiercopy):
if not config or config == {} :
self.results.update({"No configuration found":"Nothing to do"})
self._status = "Error"
else :
self.config = config
self.data = dossiercopy
self.namespace = namespace
self._subjects = subjects
pass
def _lookup_subjects(self) :
finaldictionary = {}
for item in self._subjects :
finaldictionary.update({item:self.perform_lookup(item)})
return finaldictionary
def get_arguments() :
"""Returns an array of information used to construct an argumentparser argument."""
# [ <short flag>,<unix flag>,<arg type>,<description> ]
# Example return: [ '-n', '--net', 'store_true', "Output network information about the subject" ]
return None
def perform_lookup(self,host_tuple) :
"""Returns a JSON string with lookup information about the given IP or Hostname."""
# Host tuple is a socket triple (hostname, aliaslist, ipaddrlist) where hostname is
# the primary host name responding to the given ip_address, aliaslist is a (possibly
# empty) tuple of alternative host names for the same address, and ipaddrlist is a tuple
# of IPv4/v6 addresses for the same interface on the same host (most likely containing
# only a single address).
return None