Add ability to send iMessages (#10)

This commit is contained in:
JJTech 2023-07-31 13:30:30 -04:00 committed by GitHub
commit 84fe9ed44a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 436 additions and 117 deletions

16
apns.py
View File

@ -50,7 +50,7 @@ class IncomingQueue:
with self.lock: with self.lock:
self.queue.append(item) self.queue.append(item)
def pop(self, index): def pop(self, index = -1):
with self.lock: with self.lock:
return self.queue.pop(index) return self.queue.pop(index)
@ -107,6 +107,11 @@ class APNSConnection:
self.incoming_queue.append(payload) self.incoming_queue.append(payload)
logger.debug(f"Queue length: {len(self.incoming_queue)}") logger.debug(f"Queue length: {len(self.incoming_queue)}")
def _keep_alive_loop(self):
while True and not self.sock.closed:
time.sleep(300)
self._keep_alive()
def __init__(self, private_key=None, cert=None): def __init__(self, private_key=None, cert=None):
# Generate the private key and certificate if they're not provided # Generate the private key and certificate if they're not provided
if private_key is None or cert is None: if private_key is None or cert is None:
@ -117,12 +122,17 @@ class APNSConnection:
self.sock = _connect(self.private_key, self.cert) self.sock = _connect(self.private_key, self.cert)
# Start the queue filler thread
self.queue_filler_thread = threading.Thread( self.queue_filler_thread = threading.Thread(
target=self._queue_filler, daemon=True target=self._queue_filler, daemon=True
) )
self.queue_filler_thread.start() self.queue_filler_thread.start()
self.keep_alive_thread = threading.Thread(
target=self._keep_alive_loop, daemon=True
)
self.keep_alive_thread.start()
def connect(self, root: bool = True, token: bytes = None): def connect(self, root: bool = True, token: bytes = None):
if token is None: if token is None:
logger.debug(f"Sending connect message without token (root={root})") logger.debug(f"Sending connect message without token (root={root})")
@ -212,7 +222,7 @@ class APNSConnection:
) )
) )
def keep_alive(self): def _keep_alive(self):
logger.debug("Sending keep alive message") logger.debug("Sending keep alive message")
self.sock.write(_serialize_payload(0x0C, [])) self.sock.write(_serialize_payload(0x0C, []))
# Remove any keep alive responses we have or missed # Remove any keep alive responses we have or missed

96
demo.py
View File

@ -1,20 +1,15 @@
import gzip
import json import json
import logging import logging
import plistlib
import threading import threading
import time import time
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from getpass import getpass from getpass import getpass
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from rich.logging import RichHandler from rich.logging import RichHandler
import apns import apns
import ids import ids
import imessage
logging.basicConfig( logging.basicConfig(
level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()] level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
@ -22,12 +17,14 @@ logging.basicConfig(
# Set sane log levels # Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO) logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO) logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.DEBUG) logging.getLogger("apns").setLevel(logging.INFO)
logging.getLogger("albert").setLevel(logging.INFO) logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG) logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.DEBUG) logging.getLogger("bags").setLevel(logging.INFO)
logging.getLogger("imessage").setLevel(logging.DEBUG)
# Try and load config.json # Try and load config.json
try: try:
@ -65,7 +62,10 @@ else:
user.authenticate(username, password) user.authenticate(username, password)
user.encryption_identity = ids.identity.IDSIdentity(encryption_key=CONFIG.get("encryption", {}).get("rsa_key"), signing_key=CONFIG.get("encryption", {}).get("ec_key")) user.encryption_identity = ids.identity.IDSIdentity(
encryption_key=CONFIG.get("encryption", {}).get("rsa_key"),
signing_key=CONFIG.get("encryption", {}).get("ec_key"),
)
if ( if (
CONFIG.get("id", {}).get("cert") is not None CONFIG.get("id", {}).get("cert") is not None
@ -84,17 +84,6 @@ else:
logging.info("Waiting for incoming messages...") logging.info("Waiting for incoming messages...")
# Create a thread to send keepalive messages
def keepalive():
while True:
time.sleep(300)
conn.keep_alive()
threading.Thread(target=keepalive, daemon=True).start()
# Write config.json # Write config.json
CONFIG["encryption"] = { CONFIG["encryption"] = {
"rsa_key": user.encryption_identity.encryption_key, "rsa_key": user.encryption_identity.encryption_key,
@ -119,13 +108,68 @@ CONFIG["push"] = {
with open("config.json", "w") as f: with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4) json.dump(CONFIG, f, indent=4)
import imessage
im = imessage.iMessageUser(conn, user) im = imessage.iMessageUser(conn, user)
#import time INPUT_QUEUE = apns.IncomingQueue()
#time.sleep(4)
#onn._send_ack(b'\t-\x97\x96') def input_thread():
from prompt_toolkit import prompt
while True:
try:
msg = prompt('>> ')
except:
msg = 'quit'
INPUT_QUEUE.append(msg)
threading.Thread(target=input_thread, daemon=True).start()
print("Type 'help' for help")
current_chat = None
while True: while True:
msg = im.receive() msg = im.receive()
if msg is not None: if msg is not None and msg.sender != user.handles[0]:
print(f"Got message {msg}") print(f'[{msg.sender}] {msg.text}')
if len(INPUT_QUEUE) > 0:
msg = INPUT_QUEUE.pop()
if msg == '': continue
if msg == 'help' or msg == 'h':
print('help (h): show this message')
print('quit (q): quit')
#print('send (s) [recipient] [message]: send a message')
print('filter (f) [recipient]: set the current chat')
print('note: recipient must start with tel: or mailto: and include the country code')
print('\\: escape commands (will be removed from message)')
elif msg == 'quit' or msg == 'q':
break
elif msg.startswith('filter') or msg.startswith('f'):
# Set the curernt chat
msg = msg.split(' ')
if len(msg) < 2:
print('filter [recipient]')
else:
current_chat = msg[1]
elif current_chat is not None:
if msg.startswith('\\'):
msg = msg[1:]
im.send(imessage.iMessage(
text=msg,
participants=[current_chat, user.handles[0]],
#sender=user.handles[0]
))
else:
print('No chat selected, use help for help')
# elif msg.startswith('send') or msg.startswith('s'):
# msg = msg.split(' ')
# if len(msg) < 3:
# print('send [recipient] [message]')
# else:
# im.send(imessage.iMessage(
# text=' '.join(msg[2:]),
# participants=[msg[1], user.handles[0]],
# #sender=user.handles[0]
# ))

View File

@ -219,7 +219,9 @@ def pretty_print_payload(
if topic == "com.apple.madrid": if topic == "com.apple.madrid":
print(f" {bcolors.FAIL}Madrid{bcolors.ENDC}", end="") print(f" {bcolors.FAIL}Madrid{bcolors.ENDC}", end="")
orig_payload = payload
payload = plistlib.loads(_get_field(payload[1], 3)) payload = plistlib.loads(_get_field(payload[1], 3))
# print(payload) # print(payload)
if "cT" in payload and False: if "cT" in payload and False:
# It's HTTP over APNs # It's HTTP over APNs
@ -248,9 +250,29 @@ def pretty_print_payload(
if b"plist" in body: if b"plist" in body:
body = plistlib.loads(body) body = plistlib.loads(body)
print(f" {bcolors.FAIL}Body{bcolors.ENDC}: {body}", end="") print(f" {bcolors.FAIL}Body{bcolors.ENDC}: {body}", end="")
if not "cT" in payload: #if not "cT" in payload:
for key in payload: for key in payload:
print(f" {bcolors.OKBLUE}{key}{bcolors.ENDC}: {payload[key]}") print(f" {bcolors.OKBLUE}{key}{bcolors.ENDC}: {payload[key]}")
if 'dtl' in payload:
print("OVERRIDE DTL")
payload['dtl'][0].update({'sT': b64decode("jJ86jTYbv1mGVwO44PyfuZ9lh3o56QjOE39Jk8Z99N8=")})
# Re-serialize the payload
payload = plistlib.dumps(payload, fmt=plistlib.FMT_BINARY)
# Construct APNS message
# Get the original fields except 3
fields = orig_payload[1]
fields = [field for field in fields if field[0] != 3]
# Add the new field
fields.append((3, payload))
payload = apns._serialize_payload(0xA, fields)
# Use the override payload
#print(payload, orig_payload)
#print(payload == orig_payload)
return payload
print() print()

View File

@ -1,84 +1,147 @@
# LOW LEVEL imessage function, decryption etc # LOW LEVEL imessage function, decryption etc
# Don't handle APNS etc, accept it already setup # Don't handle APNS etc, accept it already setup
## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc ## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc
# JSON parsing of keys, don't pass around strs?? # JSON parsing of keys, don't pass around strs??
import gzip
import logging
import plistlib
import random
import uuid
from dataclasses import dataclass, field
from hashlib import sha1, sha256
from io import BytesIO
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import apns import apns
import ids import ids
import plistlib
from io import BytesIO
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import gzip
from hashlib import sha1
import logging
logger = logging.getLogger("imessage") logger = logging.getLogger("imessage")
NORMAL_NONCE = b"\x00" * 15 + b"\x01" NORMAL_NONCE = b"\x00" * 15 + b"\x01" # This is always used as the AES nonce
class BalloonBody: class BalloonBody:
"""Represents the special parts of message extensions etc."""
def __init__(self, type: str, data: bytes): def __init__(self, type: str, data: bytes):
self.type = type self.type = type
self.data = data self.data = data
# TODO : Register handlers based on type id # TODO : Register handlers based on type id
@dataclass
class iMessage: class iMessage:
text: str """Represents an iMessage"""
text: str = ""
"""Plain text of message, always required, may be an empty string"""
xml: str | None = None xml: str | None = None
participants: list[str] """XML portion of message, may be None"""
sender: str participants: list[str] = field(default_factory=list)
id: str """List of participants in the message, including the sender"""
group_id: str sender: str | None = None
"""Sender of the message"""
_id: uuid.UUID | None = None
"""ID of the message, will be randomly generated if not provided"""
group_id: uuid.UUID | None = None
"""Group ID of the message, will be randomly generated if not provided"""
body: BalloonBody | None = None body: BalloonBody | None = None
"""BalloonBody, may be None"""
_compressed: bool = True
"""Internal property representing whether the message should be compressed"""
_raw: dict | None = None _raw: dict | None = None
"""Internal property representing the original raw message, may be None"""
def from_raw(message: dict) -> 'iMessage': def sanity_check(self):
self = iMessage() """Corrects any missing fields"""
if self._id is None:
self._id = uuid.uuid4()
self._raw = message if self.group_id is None:
self.group_id = uuid.uuid4()
self.text = message.get('t') if self.sender is None:
self.xml = message.get('x') if len(self.participants) > 1:
self.participants = message.get('p', []) self.sender = self.participants[-1]
if self.participants != []: else:
self.sender = self.participants[-1] logger.warning(
else: "Message has no sender, and only one participant, sanity check failed"
self.sender = None )
return False
self.id = message.get('r') if self.sender not in self.participants:
self.group_id = message.get('gid') self.participants.append(self.sender)
if 'bid' in message: if self.xml != None:
# This is a message extension body self._compressed = False # XML is never compressed for some reason
self.body = BalloonBody(message['bid'], message['b'])
return self return True
def to_raw(self) -> dict: def from_raw(message: bytes) -> "iMessage":
return { """Create an `iMessage` from raw message bytes"""
compressed = False
try:
message = gzip.decompress(message)
compressed = True
except:
pass
message = plistlib.loads(message)
return iMessage(
text=message.get("t", ""),
xml=message.get("x"),
participants=message.get("p", []),
sender=message.get("p", [])[-1] if message.get("p", []) != [] else None,
_id=uuid.UUID(message.get("r")),
group_id=uuid.UUID(message.get("gid")),
body=BalloonBody(message["bid"], message["b"])
if "bid" in message
else None,
_compressed=compressed,
_raw=message,
)
def to_raw(self) -> bytes:
"""Convert an `iMessage` to raw message bytes"""
if not self.sanity_check():
raise ValueError("Message failed sanity check")
d = {
"t": self.text, "t": self.text,
"x": self.xml, "x": self.xml,
"p": self.participants, "p": self.participants,
"r": self.id, "r": str(self._id).upper(),
"gid": self.group_id, "gid": str(self.group_id).upper(),
"pv": 0,
"gv": "8",
"v": "1",
} }
def __str__(self): # Remove keys that are None
if self._raw is not None: d = {k: v for k, v in d.items() if v is not None}
return str(self._raw)
else: # Serialize as a plist
return f"iMessage({self.text} from {self.sender})" d = plistlib.dumps(d, fmt=plistlib.FMT_BINARY)
# Compression
if self._compressed:
d = gzip.compress(d, mtime=0)
return d
class iMessageUser: class iMessageUser:
"""Represents a logged in and connected iMessage user.
This abstraction should probably be reworked into IDS some time..."""
def __init__(self, connection: apns.APNSConnection, user: ids.IDSUser): def __init__(self, connection: apns.APNSConnection, user: ids.IDSUser):
self.connection = connection self.connection = connection
@ -89,6 +152,7 @@ class iMessageUser:
Returns a raw APNs message corresponding to the next conforming notification in the queue Returns a raw APNs message corresponding to the next conforming notification in the queue
Returns None if no conforming notification is found Returns None if no conforming notification is found
""" """
def check_response(x): def check_response(x):
if x[0] != 0x0A: if x[0] != 0x0A:
return False return False
@ -96,14 +160,14 @@ class iMessageUser:
return False return False
resp_body = apns._get_field(x[1], 3) resp_body = apns._get_field(x[1], 3)
if resp_body is None: if resp_body is None:
#logger.debug("Rejecting madrid message with no body") # logger.debug("Rejecting madrid message with no body")
return False return False
resp_body = plistlib.loads(resp_body) resp_body = plistlib.loads(resp_body)
if "P" not in resp_body: if "P" not in resp_body:
#logger.debug(f"Rejecting madrid message with no payload : {resp_body}") # logger.debug(f"Rejecting madrid message with no payload : {resp_body}")
return False return False
return True return True
payload = self.connection.incoming_queue.pop_find(check_response) payload = self.connection.incoming_queue.pop_find(check_response)
if payload is None: if payload is None:
return None return None
@ -111,32 +175,110 @@ class iMessageUser:
return payload return payload
def _send_raw_message(self, message: dict):
pass
def _encrypt_message(self, message: dict) -> dict:
pass
def _sign_message(self, message: dict) -> dict:
pass
def _parse_payload(payload: bytes) -> tuple[bytes, bytes]: def _parse_payload(payload: bytes) -> tuple[bytes, bytes]:
payload = BytesIO(payload) payload = BytesIO(payload)
tag = payload.read(1) tag = payload.read(1)
#print("TAG", tag)
body_length = int.from_bytes(payload.read(2), "big") body_length = int.from_bytes(payload.read(2), "big")
body = payload.read(body_length) body = payload.read(body_length)
signature_len = payload.read(1)[0] signature_len = payload.read(1)[0]
signature = payload.read(signature_len) signature = payload.read(signature_len)
return (body, signature) return (body, signature)
def _construct_payload(body: bytes, signature: bytes) -> bytes:
payload = (
b"\x02"
+ len(body).to_bytes(2, "big")
+ body
+ len(signature).to_bytes(1, "big")
+ signature
)
return payload
def _hash_identity(id: bytes) -> bytes:
iden = ids.identity.IDSIdentity.decode(id)
# TODO: Combine this with serialization code in ids.identity
output = BytesIO()
output.write(b"\x00\x41\x04")
output.write(
ids._helpers.parse_key(iden.signing_public_key)
.public_numbers()
.x.to_bytes(32, "big")
)
output.write(
ids._helpers.parse_key(iden.signing_public_key)
.public_numbers()
.y.to_bytes(32, "big")
)
output.write(b"\x00\xAC")
output.write(b"\x30\x81\xA9")
output.write(b"\x02\x81\xA1")
output.write(
ids._helpers.parse_key(iden.encryption_public_key)
.public_numbers()
.n.to_bytes(161, "big")
)
output.write(b"\x02\x03\x01\x00\x01")
return sha256(output.getvalue()).digest()
def _encrypt_sign_payload(
self, key: ids.identity.IDSIdentity, message: bytes
) -> bytes:
# Generate a random AES key
random_seed = random.randbytes(11)
# Create the HMAC
import hmac
hm = hmac.new(
random_seed,
message
+ b"\x02"
+ iMessageUser._hash_identity(self.user.encryption_identity.encode())
+ iMessageUser._hash_identity(key.encode()),
sha256,
).digest()
aes_key = random_seed + hm[:5]
# print(len(aes_key))
# Encrypt the message with the AES key
cipher = Cipher(algorithms.AES(aes_key), modes.CTR(NORMAL_NONCE))
encrypted = cipher.encryptor().update(message)
# Encrypt the AES key with the public key of the recipient
recipient_key = ids._helpers.parse_key(key.encryption_public_key)
rsa_body = recipient_key.encrypt(
aes_key + encrypted[:100],
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None,
),
)
# Construct the payload
body = rsa_body + encrypted[100:]
sig = ids._helpers.parse_key(self.user.encryption_identity.signing_key).sign(
body, ec.ECDSA(hashes.SHA1())
)
payload = iMessageUser._construct_payload(body, sig)
return payload
def _decrypt_payload(self, payload: bytes) -> dict: def _decrypt_payload(self, payload: bytes) -> dict:
payload = iMessageUser._parse_payload(payload) payload = iMessageUser._parse_payload(payload)
body = BytesIO(payload[0]) body = BytesIO(payload[0])
rsa_body = ids._helpers.parse_key(self.user.encryption_identity.encryption_key).decrypt( rsa_body = ids._helpers.parse_key(
self.user.encryption_identity.encryption_key
).decrypt(
body.read(160), body.read(160),
padding.OAEP( padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()), mgf=padding.MGF1(algorithm=hashes.SHA1()),
@ -147,31 +289,20 @@ class iMessageUser:
cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE))
decrypted = cipher.decryptor().update(rsa_body[16:] + body.read()) decrypted = cipher.decryptor().update(rsa_body[16:] + body.read())
# Try to gzip decompress the payload
try:
decrypted = gzip.decompress(decrypted)
except:
pass
return plistlib.loads(decrypted) return decrypted
def _verify_payload(self, payload: bytes, sender: str, sender_token: str) -> bool: def _verify_payload(self, payload: bytes, sender: str, sender_token: str) -> bool:
# Get the public key for the sender # Get the public key for the sender
lookup = self.user.lookup([sender])[sender] self._cache_keys([sender])
sender_iden = None if not sender_token in self.KEY_CACHE:
for identity in lookup['identities']: logger.warning("Unable to find the public key of the sender, cannot verify")
if identity['push-token'] == sender_token: return False
sender_iden = identity
break
identity_keys = sender_iden['client-data']['public-message-identity-key']
identity_keys = ids.identity.IDSIdentity.decode(identity_keys)
identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[sender_token][0])
sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key) sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key)
payload = iMessageUser._parse_payload(payload) payload = iMessageUser._parse_payload(payload)
try: try:
@ -194,15 +325,126 @@ class iMessageUser:
return None return None
body = apns._get_field(raw[1], 3) body = apns._get_field(raw[1], 3)
body = plistlib.loads(body) body = plistlib.loads(body)
#print(f"Got body message {body}")
payload = body["P"] payload = body["P"]
if not self._verify_payload(payload, body['sP'], body["t"]):
raise Exception("Failed to verify payload")
decrypted = self._decrypt_payload(payload) decrypted = self._decrypt_payload(payload)
if "p" in decrypted:
if not self._verify_payload(payload, decrypted["p"][-1], body["t"]):
raise Exception("Failed to verify payload")
else:
logger.warning("Unable to verify, couldn't determine sender! Dropping message! (TODO work out a way to verify these anyway)")
return self.receive() # Call again to get the next message
return iMessage.from_raw(decrypted) return iMessage.from_raw(decrypted)
KEY_CACHE: dict[bytes, tuple[bytes, bytes]] = {}
"""Mapping of push token : (public key, session token)"""
USER_CACHE: dict[str, list[bytes]] = {}
"""Mapping of handle : [push tokens]"""
def _cache_keys(self, participants: list[str]):
# Check to see if we have cached the keys for all of the participants
if all([p in self.USER_CACHE for p in participants]):
return
# Look up the public keys for the participants, and cache a token : public key mapping
lookup = self.user.lookup(participants)
for key, participant in lookup.items():
if not key in self.USER_CACHE:
self.USER_CACHE[key] = []
for identity in participant["identities"]:
if not "client-data" in identity:
continue
if not "public-message-identity-key" in identity["client-data"]:
continue
if not "push-token" in identity:
continue
if not "session-token" in identity:
continue
self.USER_CACHE[key].append(identity["push-token"])
# print(identity)
self.KEY_CACHE[identity["push-token"]] = (
identity["client-data"]["public-message-identity-key"],
identity["session-token"],
)
def send(self, message: iMessage): def send(self, message: iMessage):
logger.error(f"Sending {message}") # Set the sender, if it isn't already
if message.sender is None:
message.sender = self.user.handles[0] # TODO : Which handle to use?
message.sanity_check() # Sanity check MUST be called before caching keys, so that the sender is added to the list of participants
self._cache_keys(message.participants)
# Turn the message into a raw message
raw = message.to_raw()
import base64
bundled_payloads = []
for participant in message.participants:
for push_token in self.USER_CACHE[participant]:
identity_keys = ids.identity.IDSIdentity.decode(
self.KEY_CACHE[push_token][0]
)
payload = self._encrypt_sign_payload(identity_keys, raw)
bundled_payloads.append(
{
"tP": participant,
"D": not participant
== message.sender, # TODO: Should this be false sometimes? For self messages?
"sT": self.KEY_CACHE[push_token][1],
"P": payload,
"t": push_token,
}
)
msg_id = random.randbytes(4)
body = {
"fcn": 1,
"c": 100,
"E": "pair",
"ua": "[macOS,13.4.1,22F82,MacBookPro18,3]",
"v": 8,
"i": int.from_bytes(msg_id, "big"),
"U": message._id.bytes,
"dtl": bundled_payloads,
"sP": message.sender,
}
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)
self.connection.send_message("com.apple.madrid", body, msg_id)
# This code can check to make sure we got a success response, but waiting for the response is annoying,
# so for now we just YOLO it and assume it worked
# def check_response(x):
# if x[0] != 0x0A:
# return False
# if apns._get_field(x[1], 2) != sha1("com.apple.madrid".encode()).digest():
# return False
# resp_body = apns._get_field(x[1], 3)
# if resp_body is None:
# return False
# resp_body = plistlib.loads(resp_body)
# if "c" not in resp_body or resp_body["c"] != 255:
# return False
# return True
# num_recv = 0
# while True:
# if num_recv == len(bundled_payloads):
# break
# payload = self.connection.incoming_queue.wait_pop_find(check_response)
# if payload is None:
# continue
# resp_body = apns._get_field(payload[1], 3)
# resp_body = plistlib.loads(resp_body)
# logger.error(resp_body)
# num_recv += 1

View File

@ -5,4 +5,5 @@ tlslite-ng==0.8.0a43
srp srp
pbkdf2 pbkdf2
unicorn unicorn
rich rich
prompt_toolkit