bluebubbles-bot/bin/bluebubbles_bot

190 lines
6.6 KiB
Python

#!python3
import os
import sys
import time
import yaml
import uuid
import logging
import argparse
import persona
from typing import List
from dataclasses import dataclass
import datetime
import requests
from fastapi import BackgroundTasks, FastAPI
import queue
import threading
import uvicorn
class LoggingFormatter(logging.Formatter):
def format(self, record):
module_max_width = 30
datefmt='%Y/%m/%d/ %H:%M:%S'
level = f'[{record.levelname}]'.ljust(9)
if 'log_module' not in dir(record) :
modname = str(record.module)+'.'+str(record.name)
else :
modname = record.log_module
modname = (f'{modname}'[:module_max_width-1] + ']').ljust(module_max_width)
final = "%-7s %s [%s %s" % (self.formatTime(record, self.datefmt), level, modname, record.getMessage())
return final
bot = FastAPI()
if __name__ == '__main__':
# Command-line client
# Define constants
config_template = {'bluebubbles_bot': {}}
# Gather Argument options
EXAMPLE_TEXT='Example:\n\tbluebubbles_bot -h'
parser = argparse.ArgumentParser(epilog=EXAMPLE_TEXT,formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-l', '--log', action='store', help='Specify a file to log to.')
parser.add_argument('-v', '--verbose', action='count', help='Include verbose information in the output. Add \'v\'s for more output.',default=0)
args = parser.parse_args()
log = logging.getLogger(__name__)
log = logging.LoggerAdapter(log,{'log_module':'bluebubbles_bot'})
# Configure logging
log_options = [logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]
if not args.verbose :
args.verbose = 0
if args.verbose > 3 :
args.verbose = 3
if args.log :
logging.basicConfig(level=log_options[args.verbose],filename=args.log)
logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
else :
logging.basicConfig(level=log_options[args.verbose])
logging.getLogger().handlers[0].setFormatter(LoggingFormatter())
logging.propagate=True
# Check for missing environment variables
for required_var in ['BB_SERVER_URL','BB_SERVER_PASSWORD','BIND_PORT'] :
if required_var not in os.environ.keys() :
log.error(f'Missing required ENV variable {required_var}')
exit(1)
def send_message(message) :
"""Send a message to the server, optionally with attachments."""
# Use the private API instead of applescript
if 'USE_PRIVATE_API' in os.environ.keys() and os.environ['USE_PRIVATE_API'].lower() in ['true', 'yes'] :
method = 'private-api'
else :
method = 'apple-script'
uid = str(uuid.uuid4()).upper()
text = message.text
chat_guid = message.chat_identifier
url = os.environ['BB_SERVER_URL'].rstrip('/') + '/api/v1/message/text'
params = {'password': os.environ['BB_SERVER_PASSWORD']}
effect_id = ''
subject = ''
if 'effectId' in message.meta.keys() :
effect_id = message.meta['effectId']
if 'subject' in message.meta.keys() :
subject = message.meta['subject']
payload = {
'chatGuid': chat_guid,
'message': text,
'tempGuid': uid,
'method': method,
'effectId': effect_id,
'subject': subject,
'selectedMessageGuid': ''
}
if len(message.attachments) > 0 :
payload.update({'name': uid})
payload.pop('text', None)
attachments = []
for attachment in message.attachments :
file = {'file': (uid, message.attachments[attachment].data, message.attachments[attachment].mime_type)}
attachments.append(file)
payload.update({'attachments':attachments})
url = os.environ['BB_SERVER_URL'].rstrip('/') + '/api/v1/message/attachment'
requests.post(url,params=params,json=payload,timeout=(1, 3))
def get_full_attachments(message: persona.Message) -> persona.Message :
"""Given a message with basic attachment descriptions, fetch the full attachment payloads from BlueBubbles"""
for attachment in message.attachments :
params = {'password': os.environ['BB_SERVER_PASSWORD']}
url = os.environ['BB_SERVER_URL'].rstrip('/') + '/api/v1/attachment/' + attachment.data
# log.debug(requests.get(url,params=params).text)
url = os.environ['BB_SERVER_URL'].rstrip('/') + '/api/v1/attachment/' + attachment.data + '/download'
content = requests.get(url,params=params).content
attachment.data = content
return message
def schedule_responses(message_queue) :
"""Iterate through the outbound message queue and dispatch scheduled messages."""
while True :
message = message_queue.get()
if message.timestamp < datetime.datetime.now() :
try :
send_message(message)
except requests.exceptions.ReadTimeout :
log.warning('No receipt of acknowledgement from BlueBubbles server')
else :
message_queue.put(message)
time.sleep(1)
# Configure multithreading response queue
outbound_message_queue = queue.Queue()
# Create persona instance
current_persona = persona.Persona(multiprocess_queue=outbound_message_queue)
# Kickoff message generation sender
message_sender = threading.Thread(target=schedule_responses, args=[outbound_message_queue])
message_sender.daemon = True
message_sender.start()
# Create a fastAPI instance
@bot.post('/message')
async def message(content: dict,background_tasks: BackgroundTasks) :
log.debug(content)
if content['type'] == 'new-message' :
message = content['data']
# Determine sender and receiver
if not message['isFromMe'] :
sender = message['handle']['address']
recipients = []
else :
sender = 'Me'
recipients = [message['handle']['address']]
# Resolve attachments
attachments = []
for attachment in message['attachments'] :
attachments.append(persona.Attachment(mime_type=attachment['mimeType'],data=attachment['guid']))
# Get the date sent
date_sent = datetime.datetime.fromtimestamp(message['dateCreated']/1000)
# Get any effects or subjects
subject = ''
effect_id = ''
isfromme = False
if 'subject' in message.keys() :
subject = message['subject']
if 'expressiveSendStyleId' in message.keys() :
effect_id = message['expressiveSendStyleId']
if 'isFromMe' in message.keys() :
isfromme = message['isFromMe']
# Craft the message
persona_message = persona.Message(
text=message['text'],
sender_identifier=sender,
chat_identifier=message['chats'][-1]['guid'],
identifier=message['guid'],
timestamp=date_sent,
recipients=recipients,
attachments=attachments,
meta={'subject': subject, 'effectId': effect_id,'isFromMe': isfromme}
)
persona_message = get_full_attachments(persona_message)
background_tasks.add_task(current_persona.receive_message, persona_message)
return {'status': 'ok'}
# Start Uvicorn process and async message response scheduler
bind_port = int(os.environ['BIND_PORT'])
uvicorn.run(bot,host='0.0.0.0', port=bind_port)