headercheckdashboardproxy/bin/headercheckdashboardproxy
2024-11-06 22:12:46 -07:00

114 lines
4.3 KiB
Python

#!python3
import os
import io
import sys
import yaml
import logging
import argparse
from bs4 import BeautifulSoup
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse
from pathlib import Path
from typing import Optional
import uvicorn
from uvicorn.workers import UvicornWorker
class LoggingFormatter(logging.Formatter):
def format(self, record):
module_max_width = 30
datefmt='%Y/%m/%d/ %H:%M:%S'
level = f'[{record.levelname}]'.ljust(9)
if 'log_module' not in dir(record) :
modname = str(record.module)+'.'+str(record.name)
else :
modname = record.log_module
modname = (f'{modname}'[:module_max_width-1] + ']').ljust(module_max_width)
final = "%-7s %s [%s %s" % (self.formatTime(record, self.datefmt), level, modname, record.getMessage())
return final
server = FastAPI()
def filter_html(html_path: Path, filter: dict) -> Path :
print(html_path, filter)
return html_path
@server.get("/{file_path:path}")
async def serve_file(request: Request, file_path: str) :
base_path = Path.cwd()
requested_path = (base_path / file_path).resolve()
# Ensure the requested path is within the base path
if not requested_path.is_relative_to(base_path):
raise HTTPException(status_code=404, detail="Not Found")
groups_header = request.headers.get(os.environ['GROUPS_HEADER']) or ''
current_groups = groups_header.split(',')
user_header = request.headers.get(os.environ['USER_HEADER']) or ''
file_to_serve = base_path / file_path
if file_path == '' :
index_file = os.environ['INDEX_FILE']
raw_markup = html = open(os.path.join(base_path, index_file))
soup = BeautifulSoup(raw_markup, 'html.parser')
for service in soup.find_all("li", class_='service') :
if 'users' in service.attrs.keys() :
allowed_users = service['users'].split(',')
if allowed_users != ["Any"] and user_header not in allowed_users :
service.decompose()
continue
if 'groups' in service.attrs.keys() :
allowed_groups = service['groups'].split(',')
if list(set(current_groups) & set(allowed_groups)) == [] :
service.decompose()
for category in soup.select('li.category:not(:has(ul li))') :
category.decompose()
byte_stream = io.BytesIO(soup.encode('utf-8'))
byte_stream.seek(0)
return StreamingResponse(byte_stream, media_type="text/html")
# return FileResponse(byte_stream, media_type="text/html", filename="index.html")
if file_to_serve.is_dir():
raise HTTPException(status_code=404, detail="Not Found")
elif file_to_serve.is_file():
return FileResponse(file_to_serve)
else:
raise HTTPException(status_code=404, detail="Not Found")
if __name__ == '__main__':
# Command-line client
# Gather Argument options
EXAMPLE_TEXT='Example:\n\theadercheckdashboardproxy -h'
parser = argparse.ArgumentParser(epilog=EXAMPLE_TEXT,formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-H', '--hosts', action='append', default=None, help='Collects arguments in an array.')
parser.add_argument('-d', '--dry-run', action='store_true', help='Store the existence of a variable.')
parser.add_argument('-l', '--log', action='store', help='Specify a file to log to.')
parser.add_argument('-v', '--verbose', action='count', help='Include verbose information in the output. Add \'v\'s for more output.',default=0)
args = parser.parse_args()
log = logging.getLogger(__name__)
log = logging.LoggerAdapter(log,{'log_module':'headercheckdashboardproxy'})
# Configure logging
log_options = [logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]
if 'VERBOSITY_LEVEL' in os.environ.keys() :
args.verbose = int(os.environ['VERBOSITY_LEVEL'])
if not args.verbose :
args.verbose = 0
if args.verbose > 3 :
args.verbose = 3
if args.log :
logging.basicConfig(level=log_options[args.verbose],filename=args.log)
logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
else :
logging.basicConfig(level=log_options[args.verbose])
logging.getLogger().handlers[0].setFormatter(LoggingFormatter())
logging.propagate=True
# Check for missing environment variables
for required_var in ['INDEX_FILE','GROUPS_HEADER','USER_HEADER','BIND_PORT'] :
if required_var not in os.environ.keys() :
log.error(f'Missing required ENV variable {required_var}')
exit(1)
# Main functions
bind_port = int(os.environ['BIND_PORT'])
uvicorn.run(server,host='0.0.0.0', port=bind_port)