from servicebase import ServiceBase import ipaddress import requests import json class ServiceDelegate(ServiceBase) : current_auth_header = None _return_payload = {} _host = None def get_arguments(cls) : return ['-b', '--bluecat', 'store_true', 'Return network information about the subject (bluecat)'] def startup(self) : for requirement in ['host','username','key'] : if requirement not in self._config or (requirement in self._config and (self._config[requirement] == '' or type(self._config[requirement]) != str)): self._error.append('Missing required config option ' + requirement) return self._host = self._config['host'] self._current_auth_header = self.get_bc_auth_header(self._config['host'],self._config['username'],self._config['key']) self.debug('Logged into BlueCat',1) def lookup(self,subject) : self._return_payload = {} if self._current_auth_header : objects = self.search_bc_objects(subject) if objects : if type(objects) == list : object_list = [] for object in objects : result = {'object':object} parent = self.search_bc_parent_object(object) if parent : result.update({'parent':parent}) object_list.append(result) if len(object_list) > 0 : return object_list else : self._return_payload.update({'object':objects}) parent = self.search_bc_parent_object(subject) if parent : self._return_payload.update({'parent':parent}) if len(self._return_payload) > 0 : return self._return_payload else : return None else : return None def shutdown(self) : if self._host and self._current_auth_header : logout = requests.get('https://' + self._host + '/Services/REST/v1/logout',headers=self._current_auth_header) if not logout.text.endswith('successfully logged out.\"') : self._error.append('Unable to log out of BlueCat API session') def get_bc_auth_header(self,host,username,password) : auth_token = requests.get('https://' + host + '/Services/REST/v1/login?username=' + username + '&password=' + password).text[17:71] if auth_token.startswith('BAMAuthToken') : return { 'Authorization' : auth_token } else : return None def search_bc_objects(self,subject) : """Searches BC for the subject.""" # Search by IP response = requests.get('https://' + self._host + '/Services/REST/v1/searchByObjectTypes?keyword=' + subject + '&types=IP4Address,IP6Address&start=0&count=10',headers=self._current_auth_header) return self.clean_response(response) def search_bc_parent_object(self,object) : """Search for and return information about the objects parent, if any""" if type(object) == dict and 'id' in object : response = requests.get('https://' + self._host + '/Services/REST/v1/getParent?entityId=' + str(object['id']), headers=self._current_auth_header) return self.clean_response(response) # Make a guess that it belongs to a static net that hasn't been put in bluecat elif object != '' : for net in ['/27', '/26', '/25', '/24', '/23', '/22'] : try : ip = ipaddress.IPv4Network(subject + net,strict=False) network = str(ip.network_address) response = requests.get('https://' + self._host + '/Services/REST/v1/searchByObjectTypes?keyword=' + network + '&types=IP4Network,IP4Block&start=0&count=10',headers=self._current_auth_header) if not response.text.startswith('[]') : return self.clean_response(response) except: pass def clean_response(self,response) : def clean(item) : if 'properties' in item : item['properties'] = item['properties'][:-1] bad_delim_props = [x for x in item['properties'].split('|') if x.count('=') > 1] if len(bad_delim_props) > 0 : item.update({'unknown':bad_delim_props}) attributes = dict(x.split('=') for x in item['properties'].split('|') if x.count('=') == 1) item.pop('properties', None) item.update(attributes) return item parsed = {} try : parsed = json.loads(response.text) except Exception : return parsed if type(parsed) == list and len(parsed) > 0 : for item in parsed: item = clean(item) return parsed if type(parsed) == dict : return clean(parsed)