bluebubbles-bot/persona/persona.py
Daniel Dayley 395e679de5
All checks were successful
git.cronocide.net/bluebubbles-bot/pipeline/head This commit looks good
Removed some cruft code
2023-05-01 10:22:37 -06:00

145 lines
5.5 KiB
Python

from __future__ import annotations
import os
import inspect
import logging
import importlib
from typing import List
from dataclasses import dataclass
import datetime
import queue
import threading
import time
class LoggingFormatter(logging.Formatter):
def format(self, record):
module_max_width = 30
datefmt='%Y/%m/%d/ %H:%M:%S'
level = f'[{record.levelname}]'.ljust(9)
if 'log_module' not in dir(record) :
modname = str(record.module)+'.'+str(record.name)
else :
modname = record.log_module
modname = (f'{modname}'[:module_max_width-1] + ']').ljust(module_max_width)
final = "%-7s %s [%s %s" % (self.formatTime(record, self.datefmt), level, modname, record.getMessage())
return final
class Persona :
"""A class that represents state data for the chatbot. This is the personality for the bot.
Personas load skills that allow them to execute different commands. The personas can receive
state data from the skills to affect their current context."""
_skill_map = {}
_delegate_map = {}
_initialized_skills = []
skill_class = 'Persona'.strip('-').capitalize() + 'Skill'
skill_class = 'PersonaSkill'
ready_skills = {}
outbound_messages = queue.Queue()
def __init__(self, multiprocess_queue=None) :
# Enable logging
self.log = logging.getLogger(__name__)
self.log = logging.LoggerAdapter(self.log,{'log_module':'persona'})
logging.getLogger().handlers[0].setFormatter(LoggingFormatter())
# Configure multiprocessing if requested
if multiprocess_queue :
self.outbound_messages = multiprocess_queue
# Load skills
available_skills = self.search_skills(directory='/'.join(os.path.realpath(__file__).split('/')[:-2]) + '/persona' + '/skills')
self.use_skills([x for x in available_skills.values()])
self.startup()
# Kickoff async message generation checker
message_generator = threading.Thread(target=self.send_new_message)
message_generator.daemon = True
message_generator.start()
# loading code
def add_skill(self,skill,reload) :
"""Adds a given skill and instance, reinitializing one if it already exists and such is specified."""
skill_name = skill.__module__.split('.')[-1]
if not reload and skill_name in self._skill_map.keys():
pass
else :
# We can't startup the skill here because it hasn't been configured. We'll handle that at runtime.
try:
# Remove any intialized objects of the same name, forcing a reinitialization
_initialized_skills.remove(self._skill_map[skill_name])
except:
pass
self._skill_map.update({skill_name:skill})
def use_skills(self,skills,reload=False) :
"""Defines skills that should be used in a lookup, optionally forcing them to reload."""
# Verify data
if type(skills) != list :
raise ValueError('argument \'skills\' should be of type list')
for skill in skills :
# Check if the skill is a string or a descendent of a persona-skill class
if type(skill) != str and self.skill_class not in [x.__name__ for x in inspect.getmro(skill)] :
raise ValueError('unkown type for skill')
# Find skills by name using a default path
if type(skill) == str :
available_skills = [y for x,y in search_skills().items() if x == skill and self.skill_class in [z.__name__ for z in inspect.getmro(y)]]
if len(available_skills) == 0 :
raise FileNotFoundError(skill + '.py not found')
skill = available_skills[0]
if self.skill_class in [x.__name__ for x in inspect.getmro(skill)] :
skill_name = skill.__module__.split('.')[-1]
self.add_skill(skill,reload)
continue
def get_skills(self) :
"""Returns a map of skills configured and loaded."""
return self._skill_map
def search_skills(self,directory=None) :
"""Searches a given directory for compatible skills and returns a map of available skill names and classes."""
if not directory :
directory = '/'.join(os.path.realpath(__file__).split('/')[:-1]) + '/' + 'skills'
directory = os.path.normpath(os.path.expanduser(os.path.expandvars(directory)))
name_map = {}
candidates = {x.split('.')[0]:x for x in os.listdir(directory) if x.endswith('.py')}
for name,filename in candidates.items() :
try :
spec = importlib.util.spec_from_file_location(name, directory + '/' + filename)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
instance = getattr(mod,self.skill_class)
name_map.update({filename.split('.')[0]:instance})
except Exception as e :
# Handle skill loading issues if desired
self.log.error("Unable to load skill from " + filename + ": " + str(e))
return name_map
def startup(self) :
"""Startup all message skill processors"""
for skill in self._skill_map.keys() :
try :
ClassInstance = self._skill_map[skill]
skillinstance = ClassInstance()
skillinstance.startup()
self.ready_skills.update({skill:skillinstance})
except Exception as e :
self.log.error(str(e))
continue
def receive_message(self,message: Message) :
"""Process the receipt of a message."""
for skill in self.ready_skills.values() :
should_respond = skill.match_intent(message)
if should_respond :
response = skill.respond(message=message)
if response :
self.log.info(f'Responding to \'{message.text}\' with \'{response.text}\'')
self.outbound_messages.put(response)
def send_new_message(self) :
"""Process new messages generated asynchronously by plugins."""
while True :
for skill in self.ready_skills.values() :
response = skill.generate()
if response :
self.log.info(f'Generating async message \'{response.text}\'')
self.outbound_messages.put(response)
time.sleep(1)