pypush/demo.py
2023-08-17 20:23:56 -04:00

245 lines
8.5 KiB
Python

import json
import logging
import os
import threading
import time
from base64 import b64decode, b64encode
from getpass import getpass
from rich.logging import RichHandler
import apns
import ids
import imessage
logging.basicConfig(
level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
# Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from urllib3
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.INFO)
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.INFO)
logging.getLogger("imessage").setLevel(logging.DEBUG)
logging.captureWarnings(True)
def safe_b64decode(s):
try:
return b64decode(s)
except:
return None
async def main():
# Try and load config.json
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
push_creds = apns.PushCredentials(
CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert"), CONFIG.get("push", {}).get("token")
)
async with apns.APNSConnection.start(push_creds) as conn:
conn.set_state(1)
conn.filter(["com.apple.madrid"])
user = ids.IDSUser(conn)
if CONFIG.get("auth", {}).get("cert") is not None:
auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"])
user_id = CONFIG["auth"]["user_id"]
handles = CONFIG["auth"]["handles"]
user.restore_authentication(auth_keypair, user_id, handles)
else:
username = input("Username: ")
password = getpass("Password: ")
user.authenticate(username, password)
user.encryption_identity = ids.identity.IDSIdentity(
encryption_key=CONFIG.get("encryption", {}).get("rsa_key"),
signing_key=CONFIG.get("encryption", {}).get("ec_key"),
)
if (
CONFIG.get("id", {}).get("cert") is not None
and user.encryption_identity is not None
):
id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
user.restore_identity(id_keypair)
else:
logging.info("Registering new identity...")
import emulated.nac
vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode()
user.register(vd)
# Write config.json
CONFIG["encryption"] = {
"rsa_key": user.encryption_identity.encryption_key,
"ec_key": user.encryption_identity.signing_key,
}
CONFIG["id"] = {
"key": user._id_keypair.key,
"cert": user._id_keypair.cert,
}
CONFIG["auth"] = {
"key": user._auth_keypair.key,
"cert": user._auth_keypair.cert,
"user_id": user.user_id,
"handles": user.handles,
}
CONFIG["push"] = {
"token": b64encode(user.push_connection.credentials.token).decode(),
"key": user.push_connection.credentials.private_key,
"cert": user.push_connection.credentials.cert,
}
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)
im = imessage.iMessageUser(conn, user)
#INPUT_QUEUE = apns.IncomingQueue()
# def input_thread():
# from prompt_toolkit import prompt
# while True:
# try:
# msg = prompt('>> ')
# except:
# msg = 'quit'
# INPUT_QUEUE.append(msg)
# threading.Thread(target=input_thread, daemon=True).start()
# print("Type 'help' for help")
# def fixup_handle(handle):
# if handle.startswith('tel:+'):
# return handle
# elif handle.startswith('mailto:'):
# return handle
# elif handle.startswith('tel:'):
# return 'tel:+' + handle[4:]
# elif handle.startswith('+'):
# return 'tel:' + handle
# # If the handle starts with a number
# elif handle[0].isdigit():
# # If the handle is 10 digits, assume it's a US number
# if len(handle) == 10:
# return 'tel:+1' + handle
# # If the handle is 11 digits, assume it's a US number with country code
# elif len(handle) == 11:
# return 'tel:+' + handle
# else: # Assume it's an email
# return 'mailto:' + handle
# current_participants = []
# current_effect = None
# while True:
# msg = im.receive()
# if msg is not None:
# # print(f'[{msg.sender}] {msg.text}')
# print(msg.to_string())
# attachments = msg.attachments()
# if len(attachments) > 0:
# attachments_path = f"attachments/{msg.id}/"
# os.makedirs(attachments_path, exist_ok=True)
# for attachment in attachments:
# with open(attachments_path + attachment.name, "wb") as attachment_file:
# attachment_file.write(attachment.versions[0].data())
# print(f"({len(attachments)} attachment{'s have' if len(attachments) != 1 else ' has'} been downloaded and put "
# f"in {attachments_path})")
# if len(INPUT_QUEUE) > 0:
# msg = INPUT_QUEUE.pop()
# if msg == '': continue
# if msg == 'help' or msg == 'h':
# print('help (h): show this message')
# print('quit (q): quit')
# #print('send (s) [recipient] [message]: send a message')
# print('filter (f) [recipient]: set the current chat')
# print('effect (e): adds an iMessage effect to the next sent message')
# print('note: recipient must start with tel: or mailto: and include the country code')
# print('handle <handle>: set the current handle (for sending messages)')
# print('\\: escape commands (will be removed from message)')
# elif msg == 'quit' or msg == 'q':
# break
# elif msg == 'effect' or msg == 'e' or msg.startswith("effect ") or msg.startswith("e "):
# msg = msg.split(" ")
# if len(msg) < 2 or msg[1] == "":
# print("effect [effect namespace]")
# else:
# print(f"next message will be sent with [{msg[1]}]")
# current_effect = msg[1]
# elif msg == 'filter' or msg == 'f' or msg.startswith('filter ') or msg.startswith('f '):
# # Set the curernt chat
# msg = msg.split(' ')
# if len(msg) < 2 or msg[1] == '':
# print('filter [recipients]')
# else:
# print(f'Filtering to {[fixup_handle(h) for h in msg[1:]]}')
# current_participants = [fixup_handle(h) for h in msg[1:]]
# elif msg == 'handle' or msg.startswith('handle '):
# msg = msg.split(' ')
# if len(msg) < 2 or msg[1] == '':
# print('handle [handle]')
# print('Available handles:')
# for h in user.handles:
# if h == user.current_handle:
# print(f'\t{h} (current)')
# else:
# print(f'\t{h}')
# else:
# h = msg[1]
# h = fixup_handle(h)
# if h in user.handles:
# print(f'Using {h} as handle')
# user.current_handle = h
# else:
# print(f'Handle {h} not found')
# elif current_participants != []:
# if msg.startswith('\\'):
# msg = msg[1:]
# im.send(imessage.iMessage(
# text=msg,
# participants=current_participants,
# sender=user.current_handle,
# effect=current_effect
# ))
# current_effect = None
# else:
# print('No chat selected, use help for help')
# time.sleep(0.1)
# # elif msg.startswith('send') or msg.startswith('s'):
# # msg = msg.split(' ')
# # if len(msg) < 3:
# # print('send [recipient] [message]')
# # else:
# # im.send(imessage.iMessage(
# # text=' '.join(msg[2:]),
# # participants=[msg[1], user.handles[0]],
# # #sender=user.handles[0]
# # ))