ifxlookup/plugins/paloalto.py

325 lines
12 KiB
Python

from servicebase import ServiceBase
import ipaddress
import socket
import requests
import xml.etree.ElementTree as ElementTree
RULE_SEARCH_FIELDS = ['to','from','source','destination']
class ServiceDelegate(ServiceBase) :
def get_arguments(cls) :
"""Returns an array of information used to construct an argumentparser argument."""
return ['-fw', '--rules', 'store_true', 'Return Firewall addresses, groups, and rules relating to the subject']
def startup(self) :
if not self.config_valid() :
return
else :
key = self._config['key']
self._keys = {}
self._addresses = {}
self._address_groups = {}
self._rules = {}
# Log in
self.debug('Logging into PA devices...',1)
self._keys = self.login_to_hosts()
self.debug('Finished getting PA login keys.',2)
# Get Addresses
self.debug('Getting PA addresses...',1)
for host,ip in self._config['hosts'].items() :
self.debug('Retrieving addresses from ' + host + '...',4)
self._addresses.update({host:self.get_addresses(host,self._keys[host])})
self.debug('Finished getting PA addresses...',2)
# Get Groups
self.debug('Getting PA address groups...',1)
for host,ip in self._config['hosts'].items() :
self.debug('Retrieving address groups from ' + host + '...',4)
self._address_groups.update({host:self.get_address_groups(host,self._keys[host])})
self.debug('Finished getting PA address groups...',2)
# Get Rules
self.debug('Getting PA rules...',1)
for host,ip in self._config['hosts'].items() :
self.debug('Retrieving rules from ' + host + '...',4)
self._rules.update({host:self.get_rules(host,self._keys[host])})
self.debug('Finished getting PA rules...',2)
def lookup(self, subject) :
result = {}
try :
subjecttuple = socket.gethostbyname_ex(subject)
subjecttuple = self.get_ips_from_cruft(subjecttuple[2][0])
# Can't make an IP of the subject
except (ValueError,socket.gaierror) :
# Not a valid host or IP
self._error.append('Unable to resolve ' + subject)
subjecttuple = None
# Make nets and IPs out of address strings
formatted_addresses = {}
for fw in self._addresses.keys() :
formatted_addresses.update({fw:{}})
for address in self._addresses[fw].keys() :
self.debug(self._addresses[fw][address],4)
nets_or_ips = self.get_ips_from_cruft(self._addresses[fw][address])
if nets_or_ips :
if isinstance(nets_or_ips,list) :
for ip in nets_or_ips :
formatted_addresses[fw][str(ip)] = ip
else :
formatted_addresses[fw][address] = nets_or_ips
else :
# Remove the item from the list of addresses since we can't figure out what it is.
pass
self._addresses = formatted_addresses
# Iterate over our results.
# The reason we do this by category and not by firewall is that we want all of our matching addresses
# to be available before our address group checks, and our address group checks to be available before
# our rule checks.
# Determine if subject is in address space
included_addrs = {}
for fw in self._addresses.keys() :
included_addrs.update({fw:[]})
for name, address in self._addresses[fw].items() :
# Include address if IP of the subject is in the net or if subject name in name of address
try :
if subjecttuple and subjecttuple in address :
included_addrs[fw].append(name)
continue
except :
continue
if subject.lower() in name.lower() :
included_addrs[fw].append(name)
# Determine which address groups are included
included_groups = {}
for fw in self._address_groups.keys() :
included_groups.update({fw:[]})
for name, addresses in self._address_groups[fw].items() :
if subject.lower() in name.lower():
included_groups[fw].append(name)
included_groups[fw] += [x for x in addresses if x in [y for y in included_addrs[fw]]]
# Aggregate relevant rules
included_rules = {}
for fw in self._rules.keys() :
included_rules.update({fw:{}})
for type in self._rules[fw] :
included_rules[fw].update({type:{}})
for rule_name, rule in self._rules[fw][type].items() :
if subject.lower() in rule_name.lower() :
included_rules[fw][type].update({rule_name:rule})
for field in RULE_SEARCH_FIELDS :
for item in rule :
if isinstance(rule[item],list) :
for element in item :
if subject .lower() in element.lower() :
included_rules[fw][type].update({rule_name:rule})
continue
if fw in included_addrs and len(included_addrs) > 0 and element in [x for x in included_addrs[fw]] :
included_rules[fw][type].update({rule_name:rule})
if isinstance(rule[item],str) :
if subject.lower() in rule[item].lower() or len([x for x in included_addrs[fw] if x.lower() in rule[item].lower()]) > 0:
included_rules[fw][type].update({rule_name:rule})
# Format results
# TODO clean this up you're embarrasing me
for host in self._config['hosts'] :
if host in included_addrs and len([included_addrs[fw] for fw in included_addrs if len(included_addrs[fw]) > 0]) > 0 :
if host not in result :
result.update({host:{}})
if len(included_addrs[host]) > 0 :
result[host].update({'addresses':included_addrs[host]})
if host in included_groups and len([included_groups[fw] for fw in included_groups if len(included_groups[fw]) > 0]) > 0 :
if host not in result :
result.update({host:{}})
if len(included_groups[host]) > 0 :
result[host].update({'groups':included_groups[host]})
if host in included_rules and len([included_rules[fw] for fw in included_rules if len([type for type in included_rules[fw] if len(type) > 0]) > 0]) > 0 :
if host not in result :
result.update({host:{}})
rules = {'rules':{x:included_rules[host][x] for x in included_rules[host] if len(included_rules[host][x]) > 0}}
if len(rules['rules']) > 0 :
result[host].update(rules)
if len(result[host]) == 0 :
result.pop(host,None)
if len(result) > 0 :
return result
else :
return None
def shutdown(self) :
pass
def get_ips_from_cruft(self, cruft) :
def make_net(potential_net) :
try :
address = ipaddress.IPv4Network(potential_net,strict=False)
return address
except (ipaddress.AddressValueError,ValueError) as ip4error:
try :
address = ipaddress.IPv6Network(potential_net,strict=False)
return address
except :
return None
def make_ip(potential_ip) :
try :
address = ipaddress.IPv4Address(potential_ip)
return address
except :
try :
address = ipaddress.IPv6Address(potential_ip)
return address
except :
return None
def make_range(potential_range) :
try :
items = potential_range.split('-')
if len(items) > 0 :
startip = make_ip(items[0])
if not startip :
return None
start = int(startip)
# If the end of the range is an IP
try :
endip = make_ip(items[1])
if not endip :
raise Exception
# If the end of the range is a number
except :
try :
octet = startip.exploded.replace(':','.').split('.')[-1]
octet = int(octet)
endip = start + int(items[1]) - octet
except :
return None
finally :
end = int(endip)
result = []
# Get all IPs between the two
difference = end - start
for i in range(0,difference + 1) :
result.append(ipaddress.IPv4Address(start + i))
if len(result) > 0 :
return result
else :
return None
except Exception as e:
return None
def make_host(potential_host) :
try :
hosttuple = socket.gethostbyname_ex(potential_host)
return make_ip(hosttuple[2][0])
except Exception as e:
return None
# This section MUST be declared after local methods have been declared.
for type in ['ip','net','range','host'] :
try :
function = locals()['make_' + type]
result = function(cruft)
if result :
return result
except Exception as e:
return None
return None
def get_address_groups(self,host,key) :
"""Retrieves a list of address groups."""
response = requests.get('https://' + host + '/api/?type=config&action=get&xpath=%2Fconfig%2Fdevices%2Fentry%5B%40name%3D%27localhost.localdomain%27%5D%2Fvsys%2Fentry%5B%40name%3D%27vsys1%27%5D%2Faddress-group&key=' + key,verify=False,timeout=3)
root = ElementTree.fromstring(response.text)
result = {}
if len(root[0]) > 0 :
for element in root[0][0] :
name = element.attrib['name']
members = []
for subelement in element :
if subelement.tag == 'dynamic' :
members.append(subelement[0].text.strip('\''))
if subelement.tag == 'static' :
for member in subelement :
members.append(member.text)
result.update({name:members})
return result
def get_addresses(self,host,key) :
"""Retrieves a list of saved address objects from the host."""
# TODO: Verify token is valid before making requests
response = requests.get('https://' + host + '/api/?type=config&action=get&xpath=%2Fconfig%2Fdevices%2Fentry%5B%40name%3D%27localhost.localdomain%27%5D%2Fvsys%2Fentry%5B%40name%3D%27vsys1%27%5D%2Faddress&key=' + key,verify=False,timeout=3)
root = ElementTree.fromstring(response.text)
result = {}
if len(root[0]) > 0 :
for element in root[0][0] :
result.update({element.attrib['name']:element[0].text})
return result
def get_rules(self,host,key) :
"""Retrives a list of secrutiy rules from the host."""
response = requests.get('https://' + host + '/api/?type=config&action=get&xpath=%2Fconfig%2Fdevices%2Fentry%5B%40name%3D%27localhost.localdomain%27%5D%2Fvsys%2Fentry%5B%40name%3D%27vsys1%27%5D%2Frulebase&key=' + key,verify=False,timeout=3)
# XML Sucks.
root = ElementTree.fromstring(response.text)
result = {}
if len(root[0]) > 0 :
for tag in root[0][0] :
tag_name = tag.tag
entries = {}
for entry in tag[0] :
name = entry.attrib['name']
attributes = {}
for attribute in entry :
attribute_name = attribute.tag
if attribute.text and attribute.text.strip() != '' :
attribute_value = attribute.text.strip('\'')
else :
values = []
for value in attribute :
if value.text :
values.append(value.text.strip('\''))
if len(values) == 1 :
values = values[0].strip('\'')
attribute_value = values
attributes.update({attribute_name:attribute_value})
entries.update({name:attributes})
result.update({tag_name:entries})
return result
def get_api_key(self,user,key,hostname,ip) :
"""Given authentication information, return an authenticated API key for use in REST requests to a Palo Alto device."""
try :
key_payload = requests.get('https://' + str(ip) + '/api/?type=keygen&user=' + user + '&password=' + key,verify=False,timeout=3)
except Exception as exception :
if type(exception) == requests.exceptions.ReadTimeout :
self._error.append('Unable to log into PA device ' + hostname + ': timed out.')
self.debug('Unable to log into PA device ' + hostname + ': timed out.',1)
return None
if key_payload.status_code != 200 :
self._error.append('Unable to log into PA device ' + hostname)
self.debug('Unable to log into PA device ' + hostname,1)
self.debug(key_payload.text,2)
else :
key_tree = key_payload.text
try :
key_data = ElementTree.fromstring(key_tree)[0][0].text
except Exception as exception :
self._error.append('Unable to log into PA device ' + hostname + ': invalid response.')
self.debug('Unable to parse response from PA device ' + hostname + ':',1)
self.debug(exception.text,2)
key_data = None
return key_data
def login_to_hosts(self) :
"""Get API keys for all hosts."""
keys = {}
for host,ip in self._config['hosts'].items() :
self.debug('Logging into ' + host + '...',3)
key = self.get_api_key(self._config['username'],self._config['key'],host,ip)
if key :
keys.update({host: key})
return keys
def config_valid(self) :
for requirement in ['hosts','username','key'] :
if requirement not in self._config or (requirement in self._config and self._config[requirement] == ''):
self._error.append('Missing required config option ' + requirement)
return False
return True