diff --git a/bin/bluebubbles_bot b/bin/bluebubbles_bot index 96cccca..e5a3d66 100644 --- a/bin/bluebubbles_bot +++ b/bin/bluebubbles_bot @@ -2,6 +2,7 @@ import os import sys +import time import yaml import uuid import logging @@ -11,7 +12,9 @@ from typing import List from dataclasses import dataclass import datetime import requests -from fastapi import FastAPI +from fastapi import BackgroundTasks, FastAPI +import queue +import threading import uvicorn class LoggingFormatter(logging.Formatter): @@ -101,7 +104,7 @@ if __name__ == '__main__': attachments.append(file) payload.update({'attachments':attachments}) url = os.environ['BB_SERVER_URL'].rstrip('/') + '/api/v1/message/attachment' - requests.post(url,params=params,json=payload) + requests.post(url,params=params,json=payload,timeout=(1, 3)) def get_full_attachments(message: persona.Message) -> persona.Message : """Given a message with basic attachment descriptions, fetch the full attachment payloads from BlueBubbles""" @@ -113,13 +116,33 @@ if __name__ == '__main__': content = requests.get(url,params=params).content attachment.data = content return message + + def schedule_responses(message_queue) : + """Iterate through the outbound message queue and dispatch scheduled messages.""" + while True : + message = message_queue.get() + if message.timestamp < datetime.datetime.now() : + try : + send_message(message) + except requests.exceptions.ReadTimeout : + log.warning('No receipt of acknowledgement from BlueBubbles server') + else : + message_queue.put(message) + time.sleep(1) + + # Configure multithreading response queue + outbound_message_queue = queue.Queue() # Create persona instance - current_persona = persona.Persona() + current_persona = persona.Persona(multiprocess_queue=outbound_message_queue) + # Kickoff message generation sender + message_sender = threading.Thread(target=schedule_responses, args=[outbound_message_queue]) + message_sender.daemon = True + message_sender.start() # Create a fastAPI instance @bot.post('/message') - async def message(content: dict): - log.debug(content) + async def message(content: dict,background_tasks: BackgroundTasks) : + # log.debug(content) if content['type'] == 'new-message' : message = content['data'] # Determine sender and receiver @@ -157,11 +180,10 @@ if __name__ == '__main__': meta={'subject': subject, 'effectId': effect_id,'isFromMe': isfromme} ) persona_message = get_full_attachments(persona_message) - responses = current_persona.receive_message(persona_message) - for response in responses : - prompt = persona_message - send_message(response) - return content + background_tasks.add_task(current_persona.receive_message, persona_message) + return {'status': 'ok'} + + # Start Uvicorn process and async message response scheduler bind_port = int(os.environ['BIND_PORT']) uvicorn.run(bot,host='0.0.0.0', port=bind_port) diff --git a/persona/__init__.py b/persona/__init__.py index 8e6184b..f40e37f 100644 --- a/persona/__init__.py +++ b/persona/__init__.py @@ -64,3 +64,10 @@ class PersonaBaseSkill() : This is called after the intent has been matched and the skill should produce a response. The Persona may modify the message, including its recipients, timestamp, or content.""" raise PersonaResponseException('This skill is not implemented to respond.') + + def generate(self) -> Message : + """Asynchronously generate a message. + This function should return None 99% of the time. + This is called frequently in a spin-lock, so treat any code executed here as a while loop. + The Persona may modify the message, including its recipients, timestamp, or content.""" + return None diff --git a/persona/persona.py b/persona/persona.py index ab9af7c..b000585 100644 --- a/persona/persona.py +++ b/persona/persona.py @@ -6,6 +6,9 @@ import importlib from typing import List from dataclasses import dataclass import datetime +import queue +import threading +import time class LoggingFormatter(logging.Formatter): def format(self, record): @@ -31,15 +34,24 @@ class Persona : skill_class = 'Persona'.strip('-').capitalize() + 'Skill' skill_class = 'PersonaSkill' ready_skills = {} - def __init__(self) : + outbound_messages = queue.Queue() + + def __init__(self, multiprocess_queue=None) : # Enable logging self.log = logging.getLogger(__name__) self.log = logging.LoggerAdapter(self.log,{'log_module':'persona'}) logging.getLogger().handlers[0].setFormatter(LoggingFormatter()) + # Configure multiprocessing if requested + if multiprocess_queue : + self.outbound_messages = multiprocess_queue # Load skills available_skills = self.search_skills(directory='/'.join(os.path.realpath(__file__).split('/')[:-2]) + '/persona' + '/skills') self.use_skills([x for x in available_skills.values()]) self.startup() + # Kickoff async message generation checker + message_generator = threading.Thread(target=self.send_new_message) + message_generator.daemon = True + message_generator.start() # loading code def add_skill(self,skill,reload) : @@ -96,7 +108,7 @@ class Persona : name_map.update({filename.split('.')[0]:instance}) except Exception as e : # Handle skill loading issues if desired - print("Unable to load skill from " + filename + ": " + str(e)) + self.log.error("Unable to load skill from " + filename + ": " + str(e)) return name_map def startup(self) : @@ -111,15 +123,24 @@ class Persona : self.log.error(str(e)) continue - def receive_message(self,message: Message) : """Process the receipt of a message.""" - generated_messages = [] for skill in self.ready_skills.values() : should_respond = skill.match_intent(message) if should_respond : response = skill.respond(message=message) if response : self.log.info(f'Responding to \'{message.text}\' with \'{response.text}\'') + self.outbound_messages.put(response) + # self.outbound_messages += generated_messages + + def send_new_message(self) : + """Process new messages generated asynchronously by plugins.""" + while True : + for skill in self.ready_skills.values() : + response = skill.generate() + if response : + self.log.info(f'Generating async message \'{response.text}\'') generated_messages.append(response) - return generated_messages + self.outbound_messages.put(response) + time.sleep(1) diff --git a/persona/skills/chatgpt.py b/persona/skills/chatgpt.py index c0b8fe1..7e0e1fc 100644 --- a/persona/skills/chatgpt.py +++ b/persona/skills/chatgpt.py @@ -44,7 +44,7 @@ class PersonaSkill(PersonaBaseSkill) : def match_intent(self,message: Message) -> Bool : # Tag user and bot for API if message.meta['isFromMe'] : - role = 'system' + role = 'assistant' else : role = 'user' # Record chat messages for context diff --git a/persona/skills/greeter.py b/persona/skills/greeter.py index 8bc1a92..f3c8071 100644 --- a/persona/skills/greeter.py +++ b/persona/skills/greeter.py @@ -32,4 +32,4 @@ class PersonaSkill(PersonaBaseSkill) : """Respond to a message by generating another message.""" response_options = ['Hello!','Howdy!','Hello there!','What\'s up!','Hi there!'] response_text = random.choice(response_options) - return persona.Message(text=response_text,sender_identifier=message.sender_identifier,chat_identifier=message.chat_identifier,attachments=[],timestamp=datetime.datetime.now(), recipients=[message.sender_identifier], identifier=None, meta={}) + return persona.Message(text=response_text,sender_identifier=message.sender_identifier,chat_identifier=message.chat_identifier,attachments=[],timestamp=datetime.datetime.now()+datetime.timedelta(seconds=15), recipients=[message.sender_identifier], identifier=None, meta={})