From da483b995b9e6705f55da83dc17072bdc2a895b2 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 15 Aug 2023 16:28:02 -0400 Subject: [PATCH] fix some bugs, remove old parser --- demo.py | 27 +++------ imessage.py | 154 ++++++++-------------------------------------------- 2 files changed, 33 insertions(+), 148 deletions(-) diff --git a/demo.py b/demo.py index 38e822d..c1015d9 100644 --- a/demo.py +++ b/demo.py @@ -255,27 +255,18 @@ while True: elif current_participants != []: if msg.startswith("\\"): msg = msg[1:] - im.send( - imessage.OldiMessage( - text=msg, - participants=current_participants, - sender=user.current_handle, - effect=current_effect, - ) + + imsg = imessage.iMessage.create( + im, + msg, + current_participants, ) + + imsg.effect = current_effect + + im.send(imsg) current_effect = None else: print("No chat selected, use help for help") time.sleep(0.1) - - # elif msg.startswith('send') or msg.startswith('s'): - # msg = msg.split(' ') - # if len(msg) < 3: - # print('send [recipient] [message]') - # else: - # im.send(imessage.iMessage( - # text=' '.join(msg[2:]), - # participants=[msg[1], user.handles[0]], - # #sender=user.handles[0] - # )) diff --git a/imessage.py b/imessage.py index a7e5be1..cefefd4 100644 --- a/imessage.py +++ b/imessage.py @@ -1,9 +1,3 @@ -# LOW LEVEL imessage function, decryption etc -# Don't handle APNS etc, accept it already setup - -## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc -# JSON parsing of keys, don't pass around strs?? - import base64 import gzip import logging @@ -138,7 +132,7 @@ class Message: sender: str participants: list[str] id: uuid.UUID - _raw: dict + _raw: dict | None = None _compressed: bool = True xml: str | None = None @@ -218,6 +212,19 @@ class SMSIncomingImage(Message): class iMessage(Message): effect: str | None = None + def create(user: "iMessageUser", text: str, participants: list[str]) -> "iMessage": + """Creates a basic outgoing `iMessage` from the given text and participants""" + + sender = user.user.current_handle + participants += [sender] + + return iMessage( + text=text, + sender=sender, + participants=participants, + id=uuid.uuid4(), + ) + def from_raw(message: bytes, sender: str | None = None) -> "iMessage": """Create a `iMessage` from raw message bytes""" @@ -236,129 +243,21 @@ class iMessage(Message): text=message["t"], participants=message["p"], sender=sender, - id=uuid.UUID(message["r"]), + id=uuid.UUID(message["r"]) if "r" in message else None, xml=message["x"] if "x" in message else None, _raw=message, _compressed=compressed, effect=message["iid"] if "iid" in message else None, ) - def __str__(self): - return f"[iMessage {self.sender}] '{self.text}'" - -@dataclass -class OldiMessage: - """Represents an iMessage""" - - text: str = "" - """Plain text of message, always required, may be an empty string""" - xml: str | None = None - """XML portion of message, may be None""" - participants: list[str] = field(default_factory=list) - """List of participants in the message, including the sender""" - sender: str | None = None - """Sender of the message""" - id: uuid.UUID | None = None - """ID of the message, will be randomly generated if not provided""" - group_id: uuid.UUID | None = None - """Group ID of the message, will be randomly generated if not provided""" - body: BalloonBody | None = None - """BalloonBody, may be None""" - effect: str | None = None - """iMessage effect sent with this message, may be None""" - - _compressed: bool = True - """Internal property representing whether the message should be compressed""" - - _raw: dict | None = None - """Internal property representing the original raw message, may be None""" - - def attachments(self) -> list[Attachment]: - if self.xml is not None: - return [ - Attachment(self._raw, elem) - for elem in ElementTree.fromstring(self.xml)[0] - if elem.tag == "FILE" - ] - else: - return [] - - def sanity_check(self): - """Corrects any missing fields""" - if self.id is None: - self.id = uuid.uuid4() - - if self.group_id is None: - self.group_id = uuid.uuid4() - - if self.sender is None: - if len(self.participants) > 1: - self.sender = self.participants[-1] - else: - logger.warning( - "Message has no sender, and only one participant, sanity check failed" - ) - return False - - if self.sender not in self.participants: - self.participants.append(self.sender) - - if self.xml != None: - self._compressed = False # XML is never compressed for some reason - - return True - - def from_raw(message: bytes, sender: str | None = None) -> "OldiMessage": - """Create an `iMessage` from raw message bytes""" - compressed = False - try: - message = gzip.decompress(message) - compressed = True - except: - pass - - message = plistlib.loads(message) - - logger.debug(f"Decompressed message : {message}") - - try: - return OldiMessage( - text=message[ - "t" - ], # Cause it to "fail to parse" if there isn't any good text to display, temp hack - xml=message.get("x"), - participants=message.get("p", []), - sender=sender - if sender is not None - else message.get("p", [])[-1] - if "p" in message - else None, - id=uuid.UUID(message.get("r")) if "r" in message else None, - group_id=uuid.UUID(message.get("gid")) if "gid" in message else None, - body=BalloonBody(message["bid"], message["b"]) - if "bid" in message and "b" in message - else None, - effect=message["iid"] if "iid" in message else None, - _compressed=compressed, - _raw=message, - ) - except: - #import json - - dmp = str(message) - return OldiMessage(text=f"failed to parse: {dmp}", _raw=message) - def to_raw(self) -> bytes: """Convert an `iMessage` to raw message bytes""" - if not self.sanity_check(): - raise ValueError("Message failed sanity check") d = { "t": self.text, "x": self.xml, "p": self.participants, "r": str(self.id).upper(), - "gid": str(self.group_id).upper(), "pv": 0, "gv": "8", "v": "1", @@ -376,13 +275,9 @@ class OldiMessage: d = gzip.compress(d, mtime=0) return d - - def to_string(self) -> str: - message_str = f"[{self.sender}] '{self.text}'" - if self.effect is not None: - message_str += f" with effect [{self.effect}]" - return message_str - + + def __str__(self): + return f"[iMessage {self.sender}] '{self.text}'" class iMessageUser: """Represents a logged in and connected iMessage user. @@ -570,7 +465,11 @@ class iMessageUser: decrypted = self._decrypt_payload(body["P"]) - return t.from_raw(decrypted, body["sP"]) + try: + return t.from_raw(decrypted, body["sP"]) + except Exception as e: + logger.error(f"Failed to parse message : {e}") + return None KEY_CACHE_HANDLE: str = "" KEY_CACHE: dict[bytes, dict[str, tuple[bytes, bytes]]] = {} @@ -735,12 +634,7 @@ class iMessageUser: } ) - def send(self, message: OldiMessage): - # Set the sender, if it isn't already - if message.sender is None: - message.sender = self.user.handles[0] # TODO : Which handle to use? - - message.sanity_check() # Sanity check MUST be called before caching keys, so that the sender is added to the list of participants + def send(self, message: "iMessage"): self._cache_keys(message.participants, "com.apple.madrid") self._send_raw(