Some good progress on making it a module

This commit is contained in:
Daniel Dayley 2020-05-21 20:34:56 -06:00
parent cf477c7d5e
commit 5211142f5c
10 changed files with 426 additions and 187 deletions

434
ifxlookup
View File

@ -1,4 +1,4 @@
#!/usr/local/bin/python3
#!bin/python3
# ifxlookup, A python command-line client for infrastructure equipment.
# November 2019 by Daniel Dayley
@ -7,7 +7,9 @@ import sys
import requests
import argparse
import ipaddress
import jsonpath_ng
import socket
import inspect
import yaml
import json
import re
@ -15,142 +17,320 @@ import importlib
yaml.Dumper.ignore_aliases = lambda *args : True
class IFXLookup():
DEFAULT_CONFIG = '~/.config/ifxlookup.yml'
_errorinfo = {}
_warninfo = {}
_plugin_map = {}
_delegate_map = {}
_internal_messages = {}
_debug_level = 0
_config = {}
_caching = False
_initialized_plugins = []
def __init__(self,config=None) :
if config :
self.load_config(config=config)
pass
def __add_plugin(self,plugin,reload) :
"""Adds a given ServiceBase plugin to the instance, reinitializing it if it already exists and such is specified."""
plugin_name = plugin.__module__.split('.')[-1]
if not reload and plugin_name in self._plugin_map.keys():
pass
else :
# We can't startup the plugin here because it hasn't been configured. We'll handle that at runtime.
try:
# Remove any intialized objects of the same name, forcing a reinitialization
self._initialized_plugins.remove(self._delegate_map[plugin_name])
except:
pass
self._plugin_map.update({plugin_name:plugin})
self._delegate_map.update({plugin_name:plugin()})
def __init_once(self,plugin_obj) :
if plugin_obj not in self._initialized_plugins :
plugin_obj.startup()
self._initialized_plugins.append(plugin_obj)
def __shutdown_plugins(self) :
"""Shutdown all plugins."""
for plugin in self._delegate_map.values() :
plugin.shutdown()
def __namespace(self) :
"""Generate map of ifxlookup runtime values that should be made available to plugins."""
return {'debug' : self._debug_level,'caching': self._caching}
def __filter_and_return(self,subjects,dataset,filter) :
"""Returns the submitted dataset"""
final_result = {}
if 'error' in dataset or 'warn' in dataset :
final_result['status'] = {}
if 'error' in dataset :
final_result['status']['error'] = dataset['error']
if 'warn' in dataset :
final_result['status']['warn'] = dataset['warn']
for subject in subjects :
subjectdataset = {}
for plugin in [x for x in dataset if x != 'error' and x != 'warn'] :
subjectdataset.update({plugin:dataset[plugin][subject]})
final_result.update({subject:subjectdataset})
# Apply Filter
if filter :
jsonpath_filter = jsonpath_ng.parse(filter)
final_result = [x.value for x in jsonpath_filter.find(final_result)]
return final_result
def use_plugins(self,plugins,reload=False) :
"""Defines plugins that should be used in a lookup, optionally forcing them to reset."""
# Verify data
if type(plugins) is not list :
raise ValueError('argument \'plugins\' should be of type list')
for plugin in plugins :
# Check if the plugin is a string or a descendent of the ServiceBase class
if type(plugin) is not str and 'ServiceBase' not in [x.__name__ for x in inspect.getmro(plugin)] :
raise ValueError('unkown type for plugin')
# Find plugins by name using a default path
if type(plugin) is str :
pluginfolder = 'plugins'
testpath = os.path.realpath(__file__)
filedir = '/'.join(testpath.split('/')[:-1])
pluginpath = filedir + '/' + pluginfolder
pluginfiles = [x for x in os.listdir(pluginpath) if plugin in x and x.endswith('.py')]
pluginfiles = sorted(pluginfiles)
if len(pluginfiles) == 0 :
raise FileNotFoundError(plugin + '.py not found')
importlib.import_module(pluginfolder)
for pluginfile in pluginfiles :
plugin = getattr(importlib.import_module(pluginfolder+"."+pluginfile.split('.')[0]),'ServiceDelegate')
self.__add_plugin(plugin,reload)
continue
# Load normal ServiceBase classes
if 'ServiceBase' in [x.__name__ for x in inspect.getmro(plugin)] :
self.__add_plugin(plugin,reload)
continue
return self
def get_plugins(self) :
"""Returns a map of plugins configured to use in the lookup."""
return self._delegate_map
def search_plugins(self,directory='/'.join(os.path.realpath(__file__).split('/')[:-1]) + '/' + 'plugins') :
"""Searches a given directory for compatible plugins and returns a map of available plugin names and classes"""
directory = os.path.normpath(os.path.expanduser(os.path.expandvars(directory)))
name_map = {}
candidates = {x.split('.')[0]:x for x in os.listdir(directory) if x.endswith('.py') and x != 'servicebase.py'}
for name,filename in candidates.items() :
spec = importlib.util.spec_from_file_location(name, directory + '/' + filename)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
instance = getattr(mod,'ServiceDelegate')
name_map.update({filename.split('.')[0]:instance})
return name_map
def configure(self,config) :
"""Updates the configuration of the lookup with the given config."""
if type(config) is not dict :
raise TypeError('argument \'config\' should be of type dict')
self._config.update(config)
def load_config_from_file(self,config=DEFAULT_CONFIG) :
"""Loads a JSON or YAML file containing a configuration map for plugins. Used to configure several plugins at once."""
if config :
file_descriptor = open(os.path.normpath(os.path.expanduser(os.path.expandvars(config))))
try :
file_descriptor = open(os.path.normpath(os.path.expanduser(os.path.expandvars(config))))
totalconfig = json.load(file_descriptor)
except json.decoder.JSONDecodeError as exception :
file_descriptor = open(os.path.normpath(os.path.expanduser(os.path.expandvars(config))))
totalconfig = yaml.safe_load(file_descriptor)
self._config.update(totalconfig)
def dump_config(self) :
"""Returns the current configuration state. This likely contains sensitive information such as API keys."""
return self._config
def dump_config_to_file(self,path,print_json=False) :
"""Exports the current plugin configuration to a file."""
with open(os.path.normpath(os.path.expanduser(os.path.expandvars(path))),'a') as file_handle :
if print_json :
json.dump(self._config, file_handle)
else :
yaml.dump(self._config, file_handle, default_flow_style=False)
def set_debug_level(self,level) :
"""Sets the debug level"""
if not level :
level = 0
self._debug_level = int(level)
return self
def debug_level(self) :
"""Gets the debug level"""
return this._debug_level
def enable_caching(self) :
"""Enables results caching and disables auto-shutdown of plugins."""
self._caching = True
return self
def disable_caching(self) :
"""Disables results caching and enables auto-shutdown of plugins."""
self._caching = False
return self
def lookup(self,subjects,filter=None) :
"""Performs a search with the configured plugins and filter and returns a dictionary of search data."""
hints = {}
final_report = {}
error_info = {}
warn_info = {}
# Assert that each plugin has a config
for delegatename,delegate in self._delegate_map.items() :
if not delegatename in self._config.keys() :
raise KeyError('no configuration provided for plugin ' + delegatename)
# Assert that 'subjects' is a list
if type(subjects) is not list :
raise TypeError('expected list but got ' + str(type(subjects)))
# Assert that subjects are strings
for subject in subjects :
if type(subject) is not str :
raise TypeError('subjects must be of type \'str\'')
for delegatename,delegate in self._delegate_map.items() :
# Add runtime info from ifxlookup
delegateconfig = self._config[delegatename]
delegateconfig.update({'_namespace':self.__namespace()})
# Configure and start the plugin
delegate._set_config(delegateconfig)
delegate._set_hints(hints)
# Only runs once
self.__init_once(delegate)
report = {}
for subject in subjects :
report.update({subject:delegate.lookup(subject)})
if delegate.error != [] :
error_info.update({delegatename:delegate.error})
if delegate.warn != [] :
warn_info.update({delegatename:delegate.warn})
report.update(error_info)
report.update(warn_info)
if report and report != {}:
final_report.update({delegatename:report})
# If caching is enabled prevent shutdown. Otherwise shutdown.
if not self._caching :
self.__shutdown_plugins()
return self.__filter_and_return(subjects,final_report,filter)
def finish(self) :
"""In the case that caching has been enabled, this enables the plugins to be manually shut down."""
self.__shutdown_plugins()
if __name__ == '__main__':
# Command-line client
search = IFXLookup()
available_plugins = search.search_plugins()
# Define constants
configdir = os.path.expanduser('~/.config')
configfilename = 'ifxlookup.yml'
servicetemplate = {'hosts':[],'username':'','key':''}
configtemplate = {'bluecat':servicetemplate,'f5':servicetemplate,'paloalto':servicetemplate,'aruba':servicetemplate,'openvpn':servicetemplate}
configerror = {'config': 'There was a problem reading your config file, aborting.'}
errorinfo = {}
warninfo = {}
namespace = ''
servicedelegates = []
totalconfig = None
_internal_messages = {}
_subjects = []
def __init__(self,delegates,arglist) :
self.servicedelegates = delegates
self.namespace = arglist
# Load configuration
if arglist.config :
try:
self.totalconfig = yaml.safe_load(open(os.path.normpath(arglist.config)))
except Exception as exception :
self.errorinfo.update({"config": "There was a problem reading your config file, aborting."})
else :
try:
self.totalconfig = yaml.safe_load(open(self.configdir + '/' + self.configfilename))
except FileNotFoundError as exception :
if not os.path.exists(os.path.normpath(self.configdir)):
os.makedirs(os.path.normpath(self.configdir))
if not os.path.exists(os.path.normpath(self.configdir + '/' + self.configfilename)):
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.errorinfo.update({'config': 'Config file ' + self.configdir + '/' + self.configfilename + ' is empty, aborting.'})
else :
self.errorinfo.update({"config": "There was a problem reading your config file, aborting."})
except Exception as exception :
self.errorinfo.update({"config": "There was a problem reading your config file, aborting."})
if self.totalconfig == {} :
with open(self.configdir + '/' + self.configfilename, 'w') as output :
yaml.dump(self.configtemplate, output)
self.errorinfo.update({'config': 'Config file ' + self.configdir + '/' + self.configfilename + ' is empty, populating with a template.'})
def dispatch(self) :
# Resolve subject names
for item in self.namespace.subjects :
# Pass in arguments raw
self._subjects.append(item)
# Stop on initialization error
if self.errorinfo != {} :
self._internal_messages['error'] = self.errorinfo
exit(1)
# Dispatch service delegates
delegatereports = {}
for delegate in self.servicedelegates :
delegatename = delegate.__module__.split('plugins.')[1]
if delegatename in self.totalconfig.keys() :
delegateconfig = self.totalconfig[delegatename]
else :
delegateconfig = {}
# Run Plugin
delegateinstance = delegate(delegateconfig,self.namespace,self._subjects,delegatereports)
delegateresponse = delegateinstance._lookup_subjects()
if delegateinstance.error != [] :
self.errorinfo.update({delegatename:delegateinstance.error})
if delegateresponse:
delegatereports.update({delegatename:delegateresponse})
# Format and display results
if self.warninfo != {} :
self._internal_messages['warn'] = self.warninfo
if self.errorinfo != {} :
self._internal_messages['error'] = self.errorinfo
delegatereports.update(self._internal_messages)
self.print_results(delegatereports)
def get_dc_from_hostname(self,hostname) :
"""Given a hostname, return the datacenter the host resides in."""
dcmatch = re.search('^[a-zA-Z]*-([a-zA-Z0-9]*)(-.*)*$',hostname)
if dcmatch :
return dcmatch.group(1)
else :
return ''
def print_results(self,dataset) :
"""Returns the submitted dataset"""
finalresult = {}
if 'error' in dataset or 'warn' in dataset :
finalresult['status'] = {}
if 'error' in dataset :
finalresult['status']['error'] = dataset['error']
if 'warn' in dataset :
finalresult['status']['warn'] = dataset['warn']
for subject in self._subjects :
subjectdataset = {}
for plugin in [x for x in dataset if x != 'error' and x != 'warn'] :
subjectdataset.update({plugin:dataset[plugin][subject]})
finalresult.update({subject:subjectdataset})
if self.namespace.json :
print(json.dumps(finalresult))
else :
yaml.dump(finalresult,sys.stdout)
if __name__ == "__main__":
# Load Plugins
pluginfolder = 'plugins'
testpath = os.path.realpath(__file__)
filedir = '/'.join(testpath.split('/')[:-1])
pluginpath = filedir + '/' + pluginfolder
plugindir = pluginfolder
pluginfiles = [x for x in os.listdir(pluginpath) if x != "servicebase.py" and x.endswith('.py')]
pluginfiles = sorted(pluginfiles)
importlib.import_module(pluginfolder)
delegates = []
for pluginfile in pluginfiles :
delegates.append(getattr(importlib.import_module(plugindir+"."+pluginfile.split('.')[0]),"ServiceDelegate"))
# Gather Argument options
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', action='store', help="Specify a config file (~/.config/ifxlookup.yml by default)")
parser.add_argument('-j', '--json', action='store_true', help="Return results as a json object")
parser.add_argument('-l', '--link', action='store_true', help="Return physical link information about the subject")
for delegate in delegates :
arguments = delegate.get_arguments(delegate)
parser.add_argument('-c', '--config', action='store', help='Specify a config file (~/.config/ifxlookup.yml by default)')
parser.add_argument('-f', '--filter', action='store', default=None, help='Apply a JSONPath filter to the results')
parser.add_argument('-j', '--json', action='store_true', help='Return results as a json object')
# parser.add_argument('-l', '--link', action='store_true', help="Return physical link information about the subject")
for delegate in available_plugins.values() :
arguments = delegate().get_arguments()
if arguments and len(arguments) == 4 :
parser.add_argument(arguments[0],arguments[1],action=arguments[2],help=arguments[3])
parser.add_argument('-a', '--all', action='store_true', help="Return all searchable information about the subject")
parser.add_argument('-v', '--debug', action='count', help="Include debug information in the output. Add 'v's for more output.")
parser.add_argument('subjects', metavar='subjects', nargs='+', help='IP\'s, hostnames, MAC\'s, or usernames to look up.')
parser.add_argument('-v', '--debug', action='count', help='Include debug information in the output. Add \'v\'s for more output.')
parser.add_argument('-a', '--all', action='store_true', help='Return all searchable information about the subject')
parser.add_argument('subjects', metavar='subjects', nargs='+', help='IP\'s, hostnames, MAC\'s, usernames, or other strings to look up.')
args = parser.parse_args()
# Dispatch master object
main = IFXLookup(delegates,args)
main.dispatch()
# Load plugins based on submitted args
selected_plugins = []
if vars(args)['all'] :
for delegate in available_plugins.values() :
selected_plugins.append(delegate)
else :
for name,delegate in available_plugins.items() :
argument = delegate().get_arguments()[1].split('--')[-1]
if argument in vars(args) and vars(args)[argument] :
selected_plugins.append(delegate)
search.use_plugins(selected_plugins)
# Configure subjects
subjects = []
for item in args.subjects :
subjects.append(item)
# Load configuration
# The command line configuration accepts YAML docs with required keys and values. The command line tool can generate them if
# they do not exist.
if args.config :
# TODO: this section throws a yaml error if a json doc can't be found, errors should be more specific.
try:
# totalconfig = yaml.safe_load(open(os.path.normpath(args.config)))
config = config=os.path.normpath(args.config)
except Exception as exception :
errorinfo.update(configerror)
else :
try:
config = config=os.path.normpath(configdir + '/' + configfilename)
except FileNotFoundError as exception :
if not os.path.exists(os.path.normpath(configdir)):
os.makedirs(os.path.normpath(configdir))
if not os.path.exists(os.path.normpath(configdir + '/' + configfilename)):
with open(configdir + '/' + configfilename, 'w') as output :
yaml.dump(configtemplate, output)
errorinfo.update({'config': 'Config file ' + configdir + '/' + configfilename + ' is empty, aborting.'})
else :
errorinfo.update(configerror)
except Exception as exception :
errorinfo.update(configerror)
if not config :
with open(configdir + '/' + configfilename, 'w') as output :
yaml.dump(configtemplate, output)
errorinfo.update({'config': 'Config file ' + configdir + '/' + configfilename + ' is empty, populating with a template.'})
# Stop on initialization error
if not config :
errorinfo.update(configerror)
try :
file_descriptor = open(os.path.normpath(os.path.expanduser(os.path.expandvars(config))))
totalconfig = json.load(file_descriptor)
except json.decoder.JSONDecodeError as exception :
file_descriptor = open(os.path.normpath(os.path.expanduser(os.path.expandvars(config))))
totalconfig = yaml.safe_load(file_descriptor)
for plugin_name in search.get_plugins().keys() :
if plugin_name not in totalconfig.keys() :
errorinfo.update({'config': 'Config file is missing information for plugin ' + plugin_name})
if errorinfo != {} :
print(subjects,{'error':errorinfo})
exit(1)
search.configure(totalconfig)
search.set_debug_level(args.debug)
report = search.lookup(args.subjects,filter=args.filter)
if args.json :
print(json.dumps(report))
else :
yaml.dump(report,sys.stdout)

View File

@ -41,15 +41,15 @@ class ServiceDelegate(ServiceBase) :
self.aruba_hosts = self.perform_list_query(clientsquery,self.clients_fields,clientssortfield,clientsfilter,clientsdevicetype)
self.debug("Searching Aruba information...",1)
def perform_lookup(self,subject) :
def lookup(self,subject) :
# We return the first result that matches the subject out of our lists of Aruba information.
# Because this isn't very effective we need to order our search by order of least-likely to
# most-likely for the match.
# Search APs first, since they are less likely to match
# By IP
if 'dns' in self.data and subject in self.data['dns'] and self.data['dns'][subject]['addresses'] :
for ip in self.data['dns'][subject]['addresses'] :
if 'dns' in self.hints and subject in self.hints['dns'] and self.hints['dns'][subject]['addresses'] :
for ip in self.hints['dns'][subject]['addresses'] :
for entry in self.aruba_basestations :
if ip in entry.values() :
return entry
@ -62,8 +62,8 @@ class ServiceDelegate(ServiceBase) :
return entry
# Search connected Aruba hosts
# By IP
if 'dns' in self.data and subject in self.data['dns'] and self.data['dns'][subject]['addresses'] :
for ip in self.data['dns'][subject]['addresses'] :
if 'dns' in self.hints and subject in self.hints['dns'] and self.hints['dns'][subject]['addresses'] :
for ip in self.hints['dns'][subject]['addresses'] :
for entry in self.aruba_hosts :
if ip in entry.values() :
return entry
@ -124,9 +124,9 @@ class ServiceDelegate(ServiceBase) :
# Repeat with page size on the query, aggregating results
nextstart = 0
finish = False
finished = False
allitems = []
while not finish :
while not finished :
start_row.text = str(nextstart)
num_rows.text = str(self.query_page_size)
datapayload = "".join(['query=',tostring(aruba_queries).decode(),'&UIDARUBA=',self.session_key])
@ -137,7 +137,7 @@ class ServiceDelegate(ServiceBase) :
rows = pagecontents.iter('row')
for row in rows:
if row == {} :
finish = Tru
finished = True
item = {}
headers = pagecontents.iter('column_name')
for value, header in zip(row,headers) :
@ -148,12 +148,11 @@ class ServiceDelegate(ServiceBase) :
rows = pagecontents.iter('row')
next(rows)
except Exception :
finish = True
finished = True
if page.status_code != 100 and page.status_code !=200 :
finish = True
finished = True
nextstart = nextstart + self.query_page_size
else :
finish = True
finished = True
self.debug("Retrieved " + str((len(allitems))) + " results from query",2)
return allitems

View File

@ -21,7 +21,7 @@ class ServiceDelegate(ServiceBase) :
self.current_auth_header = self.get_bc_auth_header(self.config['host'],self.config['username'],self.config['key'])
self.debug("Searching BlueCat for hosts...",1)
def perform_lookup(self,subject) :
def lookup(self,subject) :
self.return_payload = {}
if self.current_auth_header :
object = self.search_bc_object(subject)

View File

@ -15,7 +15,7 @@ class ServiceDelegate(ServiceBase) :
if 'resolver' in self.config :
self.resolver.nameservers.append(self.config['resolver'])
def perform_lookup(self,subject) :
def lookup(self,subject) :
# Note : hostname lookups always use system resolver.
# TODO: Make host lookups use the config-sepcified resolver.
@ -68,4 +68,3 @@ class ServiceDelegate(ServiceBase) :
return recs
except Exception as exception :
return recs

View File

@ -23,7 +23,7 @@ class ServiceDelegate(ServiceBase) :
self.hosts = self.config['hosts']
self.debug("Logging into F5's and searching for hosts, this make take some time.",1)
def perform_lookup(self,subject) :
def lookup(self,subject) :
self.return_payload = {}
logincandidates = self.choose_host(subject)
@ -134,15 +134,14 @@ class ServiceDelegate(ServiceBase) :
return hosts
self.debug("Unable to determine DC from subject name: " + subject,3)
# No hostname match, maybe we have info from bluecat?
if 'bluecat' in self.data and subject in self.data['bluecat'] and self.data['bluecat'][subject]['object'] :
if 'locationCode' in self.data['bluecat'][subject]['object'] :
potentialdc = self.data['bluecat'][subject]['object']['locationCode'].split(' ')[-1]
elif 'parent' in self.data['bluecat'][subject] and 'name' in self.data['bluecat'][subject]['parent'] :
potentialdc = self.data['bluecat'][subject]['parent']['name'].split(' ')[0]
if 'bluecat' in self.hints and subject in self.hints['bluecat'] and self.hints['bluecat'][subject]['object'] :
if 'locationCode' in self.hints['bluecat'][subject]['object'] :
potentialdc = self.hints['bluecat'][subject]['object']['locationCode'].split(' ')[-1]
elif 'parent' in self.hints['bluecat'][subject] and 'name' in self.hints['bluecat'][subject]['parent'] :
potentialdc = self.hints['bluecat'][subject]['parent']['name'].split(' ')[0]
for host in self.hosts.keys() :
if potentialdc.lower() in host.lower() :
hosts.update({host:self.hosts[host]})
self.debug("Got additional host info from the BlueCat plugin!",3)
return hosts
return hosts

View File

@ -23,7 +23,7 @@ class ServiceDelegate(ServiceBase) :
self.debug('This will take a long time. Increase the verbosity level to watch the progress.',1)
self.progress = 0
self.goal = len(self.channels_small)
def perform_lookup(self,subject) :
def lookup(self,subject) :
channels = {}
guid = list(re.match('([0-9a-fA-F]*)', subject.replace('-','')).groups())
if len(guid) > 0 and len(guid[0]) == 32 :
@ -31,6 +31,8 @@ class ServiceDelegate(ServiceBase) :
else :
guid = None
self.debug('Subject does not appear to be a valid GUID',1)
self.error.append('Subject does not appear to be a valid GUID')
return None
for channel in self.channels_small :
self.progress += 1
resp = self.make_drm_request(channel,guid)
@ -51,5 +53,5 @@ class ServiceDelegate(ServiceBase) :
verbosity = 2
else :
verbosity = 3
self.debug('Got ' + str(drm_req.status_code) + ' for channel ' + channel_guid + ' for user ' + user_guid + ' (' + str(self.progress) + '/' + str(self.goal) + ')',verbosity)
self.debug('Got ' + str(drm_req.status_code) + ' for channel ' + str(channel_guid) + ' for user ' + str(user_guid) + ' (' + str(self.progress) + '/' + str(self.goal) + ')',verbosity)
return drm_req.status_code

View File

@ -33,7 +33,7 @@ class ServiceDelegate(ServiceBase) :
for connection in self.connections :
connection.close()
def perform_lookup(self,subject) :
def lookup(self,subject) :
search_command = "sudo -S cat /etc/openvpn/openvpn-*p.log | grep 'primary virtual IP for' | grep '" + subject + "' | tail -n 1"
for connection in self.connections :
try:
@ -54,7 +54,8 @@ class ServiceDelegate(ServiceBase) :
return_dictionary.update({item: matches[1]})
if return_dictionary is not {} :
return return_dictionary
else :
return None
except Exception as exception :
self.error.append("Unable to get results from ssh: " + str(exception))
pass

View File

@ -23,7 +23,7 @@ class ServiceDelegate(ServiceBase) :
self.debug("Searching Shodan for hosts...",1)
def perform_lookup(self,subject) :
def lookup(self,subject) :
if self._api_sessions :
session = next(self.get_api())
try :

68
plugins/vmware.py Normal file
View File

@ -0,0 +1,68 @@
from servicebase import ServiceBase
import socket
class ServiceDelegate(ServiceBase) :
def get_arguments(cls) :
"""Returns an array of information used to construct an argumentparser argument."""
return [ '-vm', '--vm', 'store_true', "Return DNS resolution about the subject (dns)" ]
def startup(self) :
self.resolver = dns.resolver.Resolver()
if 'resolver' in self.config :
self.resolver.nameservers.append(self.config['resolver'])
def lookup(self,subject) :
# Note : hostname lookups always use system resolver.
# TODO: Make host lookups use the config-sepcified resolver.
try :
subjecttuple = socket.gethostbyname_ex(subject)
# Can't make an IP of the subject
except ValueError :
try :
# Lookup with system
subjecttuple = socket.gethostbyname_ex(subject)
except socket.gaierror :
# Not a valid host or IP
self.error.append("Unable to resolve " + subject)
subjecttuple = [subject,[],['']]
# Can't make IPv4 out of it
except socket.gaierror :
self.error.append("Unable to resolve " + subject)
subjecttuple = [subject,[],['']]
# Get additional records
return_dict = {}
mx = self.get_records(subjecttuple[0],'MX')
ns = self.get_records(subjecttuple[0],'NS')
txt = self.get_records(subjecttuple[0],'TXT')
mx.sort()
ns.sort()
txt.sort()
if mx and len(mx) > 0 :
return_dict.update({'mx':mx})
if ns and len(ns) > 0 :
return_dict.update({'ns':ns})
if txt and len(txt) > 0 :
return_dict.update({'txt':txt})
# Write final dictionary
if len(subjecttuple[2]) > 0 and subjecttuple[2][0] != '':
return_dict['addresses'] = list(subjecttuple[2])
if subjecttuple[0] and subjecttuple[0] != '' :
return_dict['name'] = subjecttuple[0]
if len(subjecttuple[1]) > 0 :
return_dict['aliases'] = list(subjecttuple[1])
return return_dict
def get_records(self,subject,recordtype) :
recs = []
try :
results = self.resolver.query(subject,recordtype)
for result in results :
recs.append(result.to_text())
return recs
except Exception as exception :
return recs

View File

@ -7,36 +7,28 @@ class ServiceBase :
error = []
warn = []
_status = "Ready"
_subjects = []
def __init__(self,config,namespace,subjects,dossiercopy):
if not config or config == {} :
self._status = "Error"
else :
self.config = config
# The following lines indicate a problem in my understanding of Python's class structure.
# Why do new instances of subclasses inherit the written values of their superclass instance?
def __init__(self,hints={},config={}):
self.config = config
self.error = []
self.warn = []
self.data = dossiercopy
self.namespace = namespace
self._subjects = subjects
pass
self.hints = hints
pass
def _lookup_subjects(self) :
args = self.get_arguments()
if args :
args = args[1][2:]
if getattr(self.namespace,args) == False and getattr(self.namespace,'all') == False :
return None
finaldictionary = {}
self.startup()
for item in self._subjects :
finaldictionary.update({item:self.perform_lookup(item)})
self.shutdown()
return finaldictionary
def _set_config(self,config) :
"""Allows for post-initialization configuration"""
self.config = config
def _set_hints(self,hints) :
"""Allows for post-initialization configuration of hint data"""
self.hints = hints
def _cache_invalid(self) :
"""Returns a boolean representing whether or not a cached value from the last lookup should be invalidated."""
return False
def debug(self,message,verbosity_level) :
if self.namespace.debug and self.namespace.debug >= verbosity_level :
"""Prints the submitted message if the environment's verbosity level matches or is surpassed by the submitted level."""
if self.config['_namespace']['debug'] and self.config['_namespace']['debug'] >= verbosity_level :
print(message,file=sys.stderr)
@classmethod
@ -50,14 +42,8 @@ class ServiceBase :
"""Perform any setup that is needed to perform lookups, such as logging in or obtaining auth sessions."""
pass
def perform_lookup(self,host_tuple) :
"""Returns a dictionary with lookup information about the given IP or Hostname."""
# host_tuple is a socket triple (hostname, aliaslist, ipaddrlist) where hostname is
# the primary host name responding to the given ip_address, aliaslist is a (possibly
# empty) tuple of alternative host names for the same address, and ipaddrlist is a tuple
# of IPv4/v6 addresses for the same interface on the same host (most likely containing
# only a single address).
def lookup(self,subject) :
"""Returns a dictionary with lookup information about the given subject."""
# Note: You should only return arrays and dictionaries with strings as keys so the information
# serializes correctly.
@ -66,3 +52,8 @@ class ServiceBase :
def shutdown(self) :
"""Perform any cleanup that is needed, such as logging out of auth sessions."""
pass
def reset(self) :
"""Reinitialize the plugin, performing any involved operations."""
self.shutdown()
self.startup()