from servicebase import ServiceBase import ipaddress import socket import requests import xml.etree.ElementTree as ElementTree RULE_SEARCH_FIELDS = ['to','from','source','destination'] class ServiceDelegate(ServiceBase) : def get_arguments(cls) : """Returns an array of information used to construct an argumentparser argument.""" # [ ,,, ] # Example return: [ '-n', '--net', 'store_true', 'Return network information about the subject' ] return ['-fw', '--rules', 'store_true', 'Return Firewall rules relating to the subject'] def startup(self) : if not self.config_valid() : return else : key = self._config['key'] self._keys = {} self._addresses = {} self._address_groups = {} self._rules = {} self.debug('Logging into PA devices...',1) self._keys = self.login_to_hosts() self.debug('Finished getting PA login keys.',2) self.debug('Getting PA addresses...',1) for host,ip in self._config['hosts'].items() : self.debug('Retrieving addresses from ' + host + '...',4) self._addresses.update({host:self.get_addresses(host,self._keys[host])}) self.debug('Finished getting PA addresses...',2) self.debug('Getting PA address groups...',1) for host,ip in self._config['hosts'].items() : self.debug('Retrieving address groups from ' + host + '...',4) self._address_groups.update({host:self.get_address_groups(host,self._keys[host])}) self.debug('Finished getting PA address groups...',2) self.debug('Getting PA rules...',1) for host,ip in self._config['hosts'].items() : self.debug('Retrieving rules from ' + host + '...',4) self._rules.update({host:self.get_rules(host,self._keys[host])}) self.debug('Finished getting PA rules...',2) def lookup(self, subject) : result = {} try : subjecttuple = socket.gethostbyname_ex(subject) subjecttuple = self.get_ips_from_cruft(subjecttuple[2][0]) # Can't make an IP of the subject except (ValueError,socket.gaierror) : # Not a valid host or IP self._error.append('Unable to resolve ' + subject) subjecttuple = None # Make nets and IPs out of address strings addresses = {} for fw in self._addresses.keys() : addresses.update({fw:{}}) for address in self._addresses[fw].keys() : self.debug(self._addresses[fw][address],4) nets_or_ips = self.get_ips_from_cruft(self._addresses[fw][address]) if nets_or_ips : if isinstance(nets_or_ips,list) : for ip in nets_or_ips : addresses[fw][str(ip)] = ip else : addresses[fw][address] = nets_or_ips else : # Remove the item from the list of addresses since we can't figure out what it is. pass self._addresses = addresses # Iterate over our results. # The reason we do this by category and not by firewall is that we want all of our matching addresses # to be available before our address group checks, and our address group checks to be available before # our rule checks. # Determine if subject is in address space included_addrs = {} for fw in self._addresses.keys() : included_addrs.update({fw:[]}) for name, address in self._addresses[fw].items() : # Include address if IP of the subject is in the net or if subject name in name of address try : if subjecttuple and subjecttuple in address : included_addrs[fw].append(name) continue except : continue if subject in name : included_addrs[fw].append(name) # Determine which address groups are included included_groups = {} for fw in self._address_groups.keys() : included_groups.update({fw:[]}) for name, addresses in self._address_groups[fw].items() : if subject in name: included_groups[fw].append(name) included_groups[fw] += [x for x in addresses if x in [y for y in included_addrs[fw]]] # Aggregate relevant rules included_rules = {} for fw in self._rules.keys() : included_rules.update({fw:{}}) for type in self._rules[fw] : included_rules[fw].update({type:{}}) for rule_name, rule in self._rules[fw][type].items() : if subject in rule_name : included_rules[fw][type].update({rule_name:rule}) for field in RULE_SEARCH_FIELDS : for item in rule : if isinstance(item,list) : for element in item : if subject in element : included_rules[fw][type].update({rule_name:rule}) continue if fw in included_addrs and len(included_addrs) > 0 and element in [x for x in included_addrs[fw]] : included_rules[fw][type].update({rule_name:rule}) if isinstance(item,str) : if subject in rule[item] : included_rules[fw][type].update({rule_name:rule}) continue if fw in included_addrs and len(included_addrs) > 0 and item in [x for x in included_addrs[fw]] : included_rules[fw][type].update({rule_name:rule}) # Format results for host in self._config['hosts'] : if host in included_addrs and len([included_addrs[fw] for fw in included_addrs if len(included_addrs[fw]) > 0]) > 0 : if host not in result : result.update({host:{}}) result[host].update({'addresses':included_addrs[host]}) if host in included_groups and len([included_groups[fw] for fw in included_groups if len(included_groups[fw]) > 0]) > 0 : if host not in result : result.update({host:{}}) result[host].update({'groups':included_groups[host]}) if host in included_rules and len([included_rules[fw] for fw in included_rules if len([type for type in included_rules[fw] if len(type) > 0]) > 0]) > 0 : if host not in result : result.update({host:{}}) result[host].update({'rules':included_rules[host]}) if len(result) > 0 : return result else : return None def shutdown(self) : pass def get_ips_from_cruft(self, cruft) : def make_net(potential_net) : try : address = ipaddress.IPv4Network(potential_net,strict=False) return address except (ipaddress.AddressValueError,ValueError) as ip4error: try : address = ipaddress.IPv6Network(potential_net,strict=False) return address except : return None def make_ip(potential_ip) : try : address = ipaddress.IPv4Address(potential_ip) return address except : try : address = ipaddress.IPv6Address(potential_ip) return address except : return None def make_range(potential_range) : try : items = potential_range.split('-') if len(items) > 0 : startip = make_ip(items[0]) if not startip : return None start = int(startip) # If the end of the range is an IP try : endip = make_ip(items[1]) if not endip : raise Exception # If the end of the range is a number except : try : octet = startip.exploded.replace(':','.').split('.')[-1] octet = int(octet) endip = start + int(items[1]) - octet except : return None finally : end = int(endip) result = [] # Get all IPs between the two difference = end - start for i in range(0,difference + 1) : result.append(ipaddress.IPv4Address(start + i)) if len(result) > 0 : return result else : return None except Exception as e: return None def make_host(potential_host) : try : hosttuple = socket.gethostbyname_ex(potential_host) return make_ip(hosttuple[2][0]) except Exception as e: return None # This section MUST be declared after local methods have been declared. for type in ['ip','net','range','host'] : try : function = locals()['make_' + type] result = function(cruft) if result : return result except Exception as e: return None return None def get_address_groups(self,host,key) : """Retrieves a list of address groups.""" response = requests.get('https://' + host + '/api/?type=config&action=get&xpath=%2Fconfig%2Fdevices%2Fentry%5B%40name%3D%27localhost.localdomain%27%5D%2Fvsys%2Fentry%5B%40name%3D%27vsys1%27%5D%2Faddress-group&key=' + key,verify=False,timeout=3) root = ElementTree.fromstring(response.text) result = {} if len(root[0]) > 0 : for element in root[0][0] : name = element.attrib['name'] members = [] for subelement in element : if subelement.tag == 'dynamic' : members.append(subelement[0].text.strip('\'')) if subelement.tag == 'static' : for member in subelement : members.append(member.text) result.update({name:members}) return result def get_addresses(self,host,key) : """Retrieves a list of saved address objects from the host.""" # TODO: Verify token is valid before making requests response = requests.get('https://' + host + '/api/?type=config&action=get&xpath=%2Fconfig%2Fdevices%2Fentry%5B%40name%3D%27localhost.localdomain%27%5D%2Fvsys%2Fentry%5B%40name%3D%27vsys1%27%5D%2Faddress&key=' + key,verify=False,timeout=3) root = ElementTree.fromstring(response.text) result = {} if len(root[0]) > 0 : for element in root[0][0] : result.update({element.attrib['name']:element[0].text}) return result def get_rules(self,host,key) : """Retrives a list of secrutiy rules from the host.""" response = requests.get('https://' + host + '/api/?type=config&action=get&xpath=%2Fconfig%2Fdevices%2Fentry%5B%40name%3D%27localhost.localdomain%27%5D%2Fvsys%2Fentry%5B%40name%3D%27vsys1%27%5D%2Frulebase&key=' + key,verify=False,timeout=3) root = ElementTree.fromstring(response.text) result = {} if len(root[0]) > 0 : for tag in root[0][0] : tag_name = tag.tag entries = {} for entry in tag[0] : name = entry.attrib['name'] attributes = {} for attribute in entry : attribute_name = attribute.tag if attribute.text and attribute.text.strip() != '' : attribute_value = attribute.text.strip('\'') else : values = [] for value in attribute : if value.text : values.append(value.text.strip('\'')) if len(values) == 1 : values = values[0].strip('\'') attribute_value = values attributes.update({attribute_name:attribute_value}) entries.update({name:attributes}) result.update({tag_name:entries}) return result def get_api_key(self,user,key,hostname,ip) : """Given authentication information, return an authenticated API key for use in REST requests to a Palo Alto device.""" try : key_payload = requests.get('https://' + str(ip) + '/api/?type=keygen&user=' + user + '&password=' + key,verify=False,timeout=3) except Exception as exception : if type(exception) is requests.exceptions.ReadTimeout : self._error.append('Unable to log into PA device ' + hostname + ': timed out.') self.debug('Unable to log into PA device ' + hostname + ': timed out.',1) return None if key_payload.status_code != 200 : self._error.append('Unable to log into PA device ' + hostname) self.debug('Unable to log into PA device ' + hostname,1) self.debug(key_payload.text,2) else : key_tree = key_payload.text try : key_data = ElementTree.fromstring(key_tree)[0][0].text except Exception as exception : self._error.append('Unable to log into PA device ' + hostname + ': invalid response.') self.debug('Unable to parse response from PA device ' + hostname + ':',1) self.debug(exception.text,2) key_data = None return key_data def login_to_hosts(self) : """Get API keys for all hosts.""" keys = {} for host,ip in self._config['hosts'].items() : self.debug('Logging into ' + host + '...',3) key = self.get_api_key(self._config['username'],self._config['key'],host,ip) if key : keys.update({host: key}) return keys def config_valid(self) : for requirement in ['hosts','username','key'] : if requirement not in self._config or (requirement in self._config and self._config[requirement] is ''): self._error.append('Missing required config option ' + requirement) return False return True