Implemented queueing system for outbound messages.
Added scheduler for future messages from any plugin fixed chatGPT message context gistory
This commit is contained in:
parent
a3e5e364a7
commit
fe9c735ef7
@ -2,6 +2,7 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import yaml
|
||||
import uuid
|
||||
import logging
|
||||
@ -11,7 +12,9 @@ from typing import List
|
||||
from dataclasses import dataclass
|
||||
import datetime
|
||||
import requests
|
||||
from fastapi import FastAPI
|
||||
from fastapi import BackgroundTasks, FastAPI
|
||||
import queue
|
||||
import threading
|
||||
import uvicorn
|
||||
|
||||
class LoggingFormatter(logging.Formatter):
|
||||
@ -101,7 +104,7 @@ if __name__ == '__main__':
|
||||
attachments.append(file)
|
||||
payload.update({'attachments':attachments})
|
||||
url = os.environ['BB_SERVER_URL'].rstrip('/') + '/api/v1/message/attachment'
|
||||
requests.post(url,params=params,json=payload)
|
||||
requests.post(url,params=params,json=payload,timeout=(1, 3))
|
||||
|
||||
def get_full_attachments(message: persona.Message) -> persona.Message :
|
||||
"""Given a message with basic attachment descriptions, fetch the full attachment payloads from BlueBubbles"""
|
||||
@ -113,13 +116,33 @@ if __name__ == '__main__':
|
||||
content = requests.get(url,params=params).content
|
||||
attachment.data = content
|
||||
return message
|
||||
|
||||
def schedule_responses(message_queue) :
|
||||
"""Iterate through the outbound message queue and dispatch scheduled messages."""
|
||||
while True :
|
||||
message = message_queue.get()
|
||||
if message.timestamp < datetime.datetime.now() :
|
||||
try :
|
||||
send_message(message)
|
||||
except requests.exceptions.ReadTimeout :
|
||||
log.warning('No receipt of acknowledgement from BlueBubbles server')
|
||||
else :
|
||||
message_queue.put(message)
|
||||
time.sleep(1)
|
||||
|
||||
# Configure multithreading response queue
|
||||
outbound_message_queue = queue.Queue()
|
||||
# Create persona instance
|
||||
current_persona = persona.Persona()
|
||||
current_persona = persona.Persona(multiprocess_queue=outbound_message_queue)
|
||||
# Kickoff message generation sender
|
||||
message_sender = threading.Thread(target=schedule_responses, args=[outbound_message_queue])
|
||||
message_sender.daemon = True
|
||||
message_sender.start()
|
||||
|
||||
# Create a fastAPI instance
|
||||
@bot.post('/message')
|
||||
async def message(content: dict):
|
||||
log.debug(content)
|
||||
async def message(content: dict,background_tasks: BackgroundTasks) :
|
||||
# log.debug(content)
|
||||
if content['type'] == 'new-message' :
|
||||
message = content['data']
|
||||
# Determine sender and receiver
|
||||
@ -157,11 +180,10 @@ if __name__ == '__main__':
|
||||
meta={'subject': subject, 'effectId': effect_id,'isFromMe': isfromme}
|
||||
)
|
||||
persona_message = get_full_attachments(persona_message)
|
||||
responses = current_persona.receive_message(persona_message)
|
||||
for response in responses :
|
||||
prompt = persona_message
|
||||
send_message(response)
|
||||
return content
|
||||
background_tasks.add_task(current_persona.receive_message, persona_message)
|
||||
return {'status': 'ok'}
|
||||
|
||||
|
||||
# Start Uvicorn process and async message response scheduler
|
||||
bind_port = int(os.environ['BIND_PORT'])
|
||||
uvicorn.run(bot,host='0.0.0.0', port=bind_port)
|
||||
|
@ -64,3 +64,10 @@ class PersonaBaseSkill() :
|
||||
This is called after the intent has been matched and the skill should produce a response.
|
||||
The Persona may modify the message, including its recipients, timestamp, or content."""
|
||||
raise PersonaResponseException('This skill is not implemented to respond.')
|
||||
|
||||
def generate(self) -> Message :
|
||||
"""Asynchronously generate a message.
|
||||
This function should return None 99% of the time.
|
||||
This is called frequently in a spin-lock, so treat any code executed here as a while loop.
|
||||
The Persona may modify the message, including its recipients, timestamp, or content."""
|
||||
return None
|
||||
|
@ -6,6 +6,9 @@ import importlib
|
||||
from typing import List
|
||||
from dataclasses import dataclass
|
||||
import datetime
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
|
||||
class LoggingFormatter(logging.Formatter):
|
||||
def format(self, record):
|
||||
@ -31,15 +34,24 @@ class Persona :
|
||||
skill_class = 'Persona'.strip('-').capitalize() + 'Skill'
|
||||
skill_class = 'PersonaSkill'
|
||||
ready_skills = {}
|
||||
def __init__(self) :
|
||||
outbound_messages = queue.Queue()
|
||||
|
||||
def __init__(self, multiprocess_queue=None) :
|
||||
# Enable logging
|
||||
self.log = logging.getLogger(__name__)
|
||||
self.log = logging.LoggerAdapter(self.log,{'log_module':'persona'})
|
||||
logging.getLogger().handlers[0].setFormatter(LoggingFormatter())
|
||||
# Configure multiprocessing if requested
|
||||
if multiprocess_queue :
|
||||
self.outbound_messages = multiprocess_queue
|
||||
# Load skills
|
||||
available_skills = self.search_skills(directory='/'.join(os.path.realpath(__file__).split('/')[:-2]) + '/persona' + '/skills')
|
||||
self.use_skills([x for x in available_skills.values()])
|
||||
self.startup()
|
||||
# Kickoff async message generation checker
|
||||
message_generator = threading.Thread(target=self.send_new_message)
|
||||
message_generator.daemon = True
|
||||
message_generator.start()
|
||||
|
||||
# loading code
|
||||
def add_skill(self,skill,reload) :
|
||||
@ -96,7 +108,7 @@ class Persona :
|
||||
name_map.update({filename.split('.')[0]:instance})
|
||||
except Exception as e :
|
||||
# Handle skill loading issues if desired
|
||||
print("Unable to load skill from " + filename + ": " + str(e))
|
||||
self.log.error("Unable to load skill from " + filename + ": " + str(e))
|
||||
return name_map
|
||||
|
||||
def startup(self) :
|
||||
@ -111,15 +123,24 @@ class Persona :
|
||||
self.log.error(str(e))
|
||||
continue
|
||||
|
||||
|
||||
def receive_message(self,message: Message) :
|
||||
"""Process the receipt of a message."""
|
||||
generated_messages = []
|
||||
for skill in self.ready_skills.values() :
|
||||
should_respond = skill.match_intent(message)
|
||||
if should_respond :
|
||||
response = skill.respond(message=message)
|
||||
if response :
|
||||
self.log.info(f'Responding to \'{message.text}\' with \'{response.text}\'')
|
||||
self.outbound_messages.put(response)
|
||||
# self.outbound_messages += generated_messages
|
||||
|
||||
def send_new_message(self) :
|
||||
"""Process new messages generated asynchronously by plugins."""
|
||||
while True :
|
||||
for skill in self.ready_skills.values() :
|
||||
response = skill.generate()
|
||||
if response :
|
||||
self.log.info(f'Generating async message \'{response.text}\'')
|
||||
generated_messages.append(response)
|
||||
return generated_messages
|
||||
self.outbound_messages.put(response)
|
||||
time.sleep(1)
|
||||
|
@ -44,7 +44,7 @@ class PersonaSkill(PersonaBaseSkill) :
|
||||
def match_intent(self,message: Message) -> Bool :
|
||||
# Tag user and bot for API
|
||||
if message.meta['isFromMe'] :
|
||||
role = 'system'
|
||||
role = 'assistant'
|
||||
else :
|
||||
role = 'user'
|
||||
# Record chat messages for context
|
||||
|
@ -32,4 +32,4 @@ class PersonaSkill(PersonaBaseSkill) :
|
||||
"""Respond to a message by generating another message."""
|
||||
response_options = ['Hello!','Howdy!','Hello there!','What\'s up!','Hi there!']
|
||||
response_text = random.choice(response_options)
|
||||
return persona.Message(text=response_text,sender_identifier=message.sender_identifier,chat_identifier=message.chat_identifier,attachments=[],timestamp=datetime.datetime.now(), recipients=[message.sender_identifier], identifier=None, meta={})
|
||||
return persona.Message(text=response_text,sender_identifier=message.sender_identifier,chat_identifier=message.chat_identifier,attachments=[],timestamp=datetime.datetime.now()+datetime.timedelta(seconds=15), recipients=[message.sender_identifier], identifier=None, meta={})
|
||||
|
Loading…
Reference in New Issue
Block a user