ifxlookup/plugins/bluecat.py

91 lines
3.7 KiB
Python
Raw Normal View History

from servicebase import ServiceBase
import ipaddress
2019-12-05 16:52:32 +00:00
import requests
import json
class ServiceDelegate(ServiceBase) :
2019-12-05 16:52:32 +00:00
current_auth_header = None
return_payload = {}
host = None
2019-12-04 19:00:50 +00:00
def get_arguments(cls) :
return ['-b', '--bluecat', 'store_true', "Return network information about the subject (bluecat)"]
2019-12-05 16:52:32 +00:00
def startup(self) :
for requirement in ['host','username','key'] :
if requirement not in self.config or (requirement in self.config and (self.config[requirement] is '' or type(self.config[requirement]) is not str)):
self.error.append('Missing required config option ' + requirement)
2019-12-05 16:52:32 +00:00
return
self.host = self.config['host']
self.current_auth_header = self.get_bc_auth_header(self.config['host'],self.config['username'],self.config['key'])
2019-12-11 18:48:31 +00:00
self.debug("Searching BlueCat for hosts...",1)
2019-12-05 16:52:32 +00:00
def perform_lookup(self,subject) :
2019-12-05 16:52:32 +00:00
self.return_payload = {}
if self.current_auth_header :
object = self.search_bc_object(subject)
2019-12-05 16:52:32 +00:00
self.return_payload.update({'object':object})
parent = self.search_bc_parent_object(subject)
2019-12-05 16:52:32 +00:00
self.return_payload.update({'parent':parent})
return self.return_payload
else :
return self.return_payload
2019-12-05 16:52:32 +00:00
def shutdown(self) :
if self.host and self.current_auth_header :
logout = requests.get("https://" + self.host + "/Services/REST/v1/logout",headers=self.current_auth_header)
if not logout.text.endswith("successfully logged out.\"") :
self.error.append('Unable to log out of BlueCat API session')
2019-12-05 16:52:32 +00:00
def get_bc_auth_header(self,host,username,password) :
auth_token = requests.get("https://" + host + "/Services/REST/v1/login?username=" + username + "&password=" + password).text[17:71]
if auth_token.startswith('BAMAuthToken') :
return { "Authorization" : auth_token }
else :
return None
def search_bc_object(self,subject) :
2019-12-05 16:52:32 +00:00
"""Searches BC for the subject."""
# Search by IP
response = requests.get("https://" + self.host + "/Services/REST/v1/searchByObjectTypes?keyword=" + subject + "&types=IP4Address,IP6Address&start=0&count=10",headers=self.current_auth_header)
2019-12-05 16:52:32 +00:00
return self.clean_response(response)
def search_bc_parent_object(self,subject) :
2019-12-05 16:52:32 +00:00
"""Search for and return information about the objects parent, if any"""
if 'object' in self.return_payload and type(self.return_payload['object']) is dict and 'id' in self.return_payload['object'] :
2019-12-05 16:52:32 +00:00
response = requests.get("https://" + self.host + "/Services/REST/v1/getParent?entityId=" + str(self.return_payload['object']['id']), headers=self.current_auth_header)
return self.clean_response(response)
# Make a guess that it belongs to a static net that hasn't been put in bluecat
elif subject != '' :
for net in ['/27', '/26', '/25', '/24', '/23', '/22'] :
try :
ip = ipaddress.IPv4Network(subject + net,strict=False)
network = str(ip.network_address)
response = requests.get("https://" + self.host + "/Services/REST/v1/searchByObjectTypes?keyword=" + network + "&types=IP4Network,IP4Block&start=0&count=10",headers=self.current_auth_header)
if not response.text.startswith('[]') :
return self.clean_response(response)
except:
pass
2019-12-05 16:52:32 +00:00
def clean_response(self,response) :
parsed = {}
try :
parsed = json.loads(response.text)
except Exception :
return parsed
if type(parsed) is list and len(parsed) > 0 :
parsed = parsed[0]
if type(parsed) is dict :
if 'properties' in parsed :
parsed["properties"] = parsed["properties"][:-1]
bad_delim_props = [x for x in parsed["properties"].split("|") if x.count('=') > 1]
if len(bad_delim_props) > 0 :
parsed.update({"unknown":bad_delim_props})
attributes = dict(x.split("=") for x in parsed["properties"].split("|") if x.count('=') == 1)
parsed.pop("properties", None)
parsed.update(attributes)
return parsed