From 06ffa8a2ca25c68858e69d4370b1980e366d2a11 Mon Sep 17 00:00:00 2001 From: "R. M" <50295204+glomatico@users.noreply.github.com> Date: Thu, 7 Sep 2023 16:20:33 -0300 Subject: [PATCH] add return types and rename lyrics variable --- gamdl/cli.py | 8 ++-- gamdl/downloader.py | 98 +++++++++++++++++++++++++-------------------- 2 files changed, 59 insertions(+), 47 deletions(-) diff --git a/gamdl/cli.py b/gamdl/cli.py index f106265..dbe02f7 100644 --- a/gamdl/cli.py +++ b/gamdl/cli.py @@ -359,9 +359,9 @@ def main( cover_url = dl.get_cover_url(webplayback) if track["type"] == "songs": logger.debug("Getting lyrics") - unsynced_lyrics, synced_lyrics = dl.get_lyrics(track_id) + lyrics_unsynced, lyrics_synced = dl.get_lyrics(track_id) logger.debug("Getting tags") - tags = dl.get_tags_song(webplayback, unsynced_lyrics) + tags = dl.get_tags_song(webplayback, lyrics_unsynced) final_location = dl.get_final_location(tags) cover_location = dl.get_cover_location_song(final_location) lrc_location = dl.get_lrc_location(final_location) @@ -421,13 +421,13 @@ def main( else: logger.debug(f'Saving cover to "{cover_location}"') dl.save_cover(cover_location, cover_url) - if no_lrc or not synced_lyrics: + if no_lrc or not lyrics_synced: pass elif lrc_location.exists() and not overwrite: logger.warning(f'"{lrc_location}" already exists, skipping') else: logger.debug(f'Saving synced lyrics to "{lrc_location}"') - dl.make_lrc(lrc_location, synced_lyrics) + dl.make_lrc(lrc_location, lyrics_synced) if track["type"] == "music-videos": if ( not disable_music_video_album_skip diff --git a/gamdl/downloader.py b/gamdl/downloader.py index 8caf8c0..931f977 100644 --- a/gamdl/downloader.py +++ b/gamdl/downloader.py @@ -115,7 +115,7 @@ class Downloader: self.cdm = Cdm.from_device(Device.load(self.wvd_location)) self.cdm_session = self.cdm.open() - def get_download_queue(self, url): + def get_download_queue(self, url: str) -> tuple[str, list[dict]]: download_queue = [] track_id = url.split("/")[-1].split("i=")[-1].split("&")[0].split("?")[0] response = self.session.get( @@ -137,7 +137,7 @@ class Downloader: raise Exception("Criteria not met") return response["type"], download_queue - def get_webplayback(self, track_id): + def get_webplayback(self, track_id: str) -> dict: response = self.session.post( "https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback", json={ @@ -147,12 +147,12 @@ class Downloader: ).json()["songList"][0] return response - def get_stream_url_song(self, webplayback): + def get_stream_url_song(self, webplayback: dict) -> str: return next( i for i in webplayback["assets"] if i["flavor"] == self.songs_flavor )["URL"] - def get_stream_url_music_video(self, webplayback): + def get_stream_url_music_video(self, webplayback: dict) -> tuple[str, str]: ydl = YoutubeDL( { "allow_unplayable_formats": True, @@ -202,31 +202,31 @@ class Downloader: ) return stream_url_video, stream_url_audio - def get_encrypted_location_video(self, track_id): + def get_encrypted_location_video(self, track_id: str) -> Path: return self.temp_path / f"{track_id}_encrypted_video.mp4" - def get_encrypted_location_audio(self, track_id): + def get_encrypted_location_audio(self, track_id: str) -> Path: return self.temp_path / f"{track_id}_encrypted_audio.m4a" - def get_decrypted_location_video(self, track_id): + def get_decrypted_location_video(self, track_id: str) -> Path: return self.temp_path / f"{track_id}_decrypted_video.mp4" - def get_decrypted_location_audio(self, track_id): + def get_decrypted_location_audio(self, track_id: str) -> Path: return self.temp_path / f"{track_id}_decrypted_audio.m4a" - def get_fixed_location(self, track_id, file_extension): + def get_fixed_location(self, track_id: str, file_extension: str) -> Path: return self.temp_path / f"{track_id}_fixed{file_extension}" - def get_cover_location_song(self, final_location): + def get_cover_location_song(self, final_location: Path) -> Path: return final_location.parent / f"Cover.{self.cover_format}" - def get_cover_location_music_video(self, final_location): + def get_cover_location_music_video(self, final_location: Path) -> Path: return final_location.with_suffix(f".{self.cover_format}") - def get_lrc_location(self, final_location): + def get_lrc_location(self, final_location: Path) -> Path: return final_location.with_suffix(".lrc") - def download_yt_dlp(self, encrypted_location, stream_url): + def download_yt_dlp(self, encrypted_location: Path, stream_url: str) -> None: with YoutubeDL( { "quiet": True, @@ -238,7 +238,7 @@ class Downloader: ) as ydl: ydl.download(stream_url) - def download_nm3u8dlre(self, encrypted_location, stream_url): + def download_nm3u8dlre(self, encrypted_location: Path, stream_url: str) -> None: subprocess.run( [ self.nm3u8dlre_location, @@ -259,7 +259,7 @@ class Downloader: check=True, ) - def get_license_b64(self, challenge, track_uri, track_id): + def get_license_b64(self, challenge: str, track_uri: str, track_id: str) -> str: return self.session.post( "https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense", json={ @@ -272,7 +272,7 @@ class Downloader: }, ).json()["license"] - def get_decryption_key_music_video(self, stream_url, track_id): + def get_decryption_key_music_video(self, stream_url: str, track_id: str) -> str: playlist = m3u8.load(stream_url) track_uri = next( i @@ -289,7 +289,7 @@ class Downloader: i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT" ).key.hex() - def get_decryption_key_song(self, stream_url, track_id): + def get_decryption_key_song(self, stream_url: str, track_id: str) -> str: track_uri = m3u8.load(stream_url).keys[0].uri widevine_pssh_data = WidevinePsshData() widevine_pssh_data.algorithm = 1 @@ -304,7 +304,7 @@ class Downloader: i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT" ).key.hex() - def get_synced_lyrics_lrc_timestamp(self, ttml_timestamp): + def get_lyrics_synced_lrc_timestamp(self, ttml_timestamp: str) -> str: mins = int(ttml_timestamp.split(":")[-2]) if ":" in ttml_timestamp else 0 secs, ms = str( float(ttml_timestamp.split(":")[-1]) @@ -324,7 +324,7 @@ class Downloader: ) return lrc_timestamp.strftime("%M:%S.%f")[:-4] - def get_lyrics(self, track_id): + def get_lyrics(self, track_id: str) -> tuple[str, str]: try: lyrics_ttml = ElementTree.fromstring( self.session.get( @@ -333,28 +333,28 @@ class Downloader: ) except: return None, None - unsynced_lyrics = "" - synced_lyrics = "" + lyrics_unsynced = "" + lyrics_synced = "" for div in lyrics_ttml.iter("{http://www.w3.org/ns/ttml}div"): for p in div.iter("{http://www.w3.org/ns/ttml}p"): if p.attrib.get("begin"): - synced_lyrics += f'[{self.get_synced_lyrics_lrc_timestamp(p.attrib.get("begin"))}]{p.text}\n' + lyrics_synced += f'[{self.get_lyrics_synced_lrc_timestamp(p.attrib.get("begin"))}]{p.text}\n' if p.text is not None: - unsynced_lyrics += p.text + "\n" - unsynced_lyrics += "\n" - return unsynced_lyrics[:-2], synced_lyrics + lyrics_unsynced += p.text + "\n" + lyrics_unsynced += "\n" + return lyrics_unsynced[:-2], lyrics_synced - def get_cover_url(self, webplayback): + def get_cover_url(self, webplayback: dict) -> str: return ( webplayback["artwork-urls"]["default"]["url"].rsplit("/", 1)[0] + f"/{self.cover_size}x{self.cover_size}bb.{self.cover_format}" ) @functools.lru_cache() - def get_cover(self, cover_url): + def get_cover(self, cover_url: str) -> bytes: return requests.get(cover_url).content - def get_tags_song(self, webplayback, unsynced_lyrics): + def get_tags_song(self, webplayback: dict, lyrics_unsynced: str) -> dict: flavor = next( i for i in webplayback["assets"] if i["flavor"] == self.songs_flavor ) @@ -381,7 +381,7 @@ class Downloader: "gapless": metadata["gapless"], "genre": metadata["genre"], "genre_id": metadata["genreId"], - "lyrics": unsynced_lyrics if unsynced_lyrics else None, + "lyrics": lyrics_unsynced if lyrics_unsynced else None, "media_type": 1, "rating": metadata["explicit"], "storefront": metadata["s"], @@ -394,7 +394,7 @@ class Downloader: } return tags - def get_tags_music_video(self, track_id): + def get_tags_music_video(self, track_id: str) -> dict: metadata = requests.get( f"https://itunes.apple.com/lookup", params={ @@ -438,7 +438,7 @@ class Downloader: tags["track_total"] = metadata[0]["trackCount"] return tags - def get_sanitized_string(self, dirty_string, is_folder): + def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str: dirty_string = re.sub(r'[\\/:*?"<>|;]', "_", dirty_string) if is_folder: dirty_string = dirty_string[: self.truncate] @@ -449,7 +449,7 @@ class Downloader: dirty_string = dirty_string[: self.truncate - 4] return dirty_string.strip() - def get_final_location(self, tags): + def get_final_location(self, tags: dict) -> Path: if "album" in tags: final_location_folder = ( self.template_folder_compilation.split("/") @@ -480,7 +480,9 @@ class Downloader: *final_location_file ) - def decrypt(self, encrypted_location, decrypted_location, decryption_key): + def decrypt( + self, encrypted_location: Path, decrypted_location: Path, decryption_key: str + ) -> None: subprocess.run( [ self.mp4decrypt_location, @@ -491,7 +493,7 @@ class Downloader: ], ) - def fixup_song_mp4box(self, decrypted_location, fixed_location): + def fixup_song_mp4box(self, decrypted_location: Path, fixed_location: Path) -> None: subprocess.run( [ self.mp4box_location, @@ -506,7 +508,10 @@ class Downloader: ) def fixup_music_video_mp4box( - self, decrypted_location_audio, decrypted_location_video, fixed_location + self, + decrypted_location_audio: Path, + decrypted_location_video: Path, + fixed_location: Path, ): subprocess.run( [ @@ -524,7 +529,9 @@ class Downloader: check=True, ) - def fixup_song_ffmpeg(self, encrypted_location, decryption_key, fixed_location): + def fixup_song_ffmpeg( + self, encrypted_location: Path, decryption_key: str, fixed_location: Path + ) -> None: subprocess.run( [ self.ffmpeg_location, @@ -545,8 +552,11 @@ class Downloader: ) def fixup_music_video_ffmpeg( - self, decrypted_location_video, decrypted_location_audio, fixed_location - ): + self, + decrypted_location_video: Path, + decrypted_location_audio: Path, + fixed_location: Path, + ) -> None: subprocess.run( [ self.ffmpeg_location, @@ -570,7 +580,7 @@ class Downloader: check=True, ) - def apply_tags(self, fixed_location, tags, cover_url): + def apply_tags(self, fixed_location: Path, tags: dict, cover_url: str) -> None: mp4_tags = { v: [tags[k]] for k, v in MP4_TAGS_MAP.items() @@ -606,15 +616,17 @@ class Downloader: mp4.update(mp4_tags) mp4.save() - def move_to_final_location(self, fixed_location, final_location): + def move_to_final_location( + self, fixed_location: Path, final_location: Path + ) -> None: final_location.parent.mkdir(parents=True, exist_ok=True) shutil.move(fixed_location, final_location) - def save_cover(self, cover_location, cover_url): + def save_cover(self, cover_location: Path, cover_url: str) -> None: with open(cover_location, "wb") as f: f.write(self.get_cover(cover_url)) - def make_lrc(self, lrc_location, synced_lyrics): + def make_lrc(self, lrc_location: Path, lyrics_synced: str) -> None: lrc_location.parent.mkdir(parents=True, exist_ok=True) with open(lrc_location, "w", encoding="utf8") as f: - f.write(synced_lyrics) + f.write(lyrics_synced)