add return types and rename lyrics variable

This commit is contained in:
R. M 2023-09-07 16:20:33 -03:00
parent 12cf08d489
commit 06ffa8a2ca
2 changed files with 59 additions and 47 deletions

View File

@ -359,9 +359,9 @@ def main(
cover_url = dl.get_cover_url(webplayback)
if track["type"] == "songs":
logger.debug("Getting lyrics")
unsynced_lyrics, synced_lyrics = dl.get_lyrics(track_id)
lyrics_unsynced, lyrics_synced = dl.get_lyrics(track_id)
logger.debug("Getting tags")
tags = dl.get_tags_song(webplayback, unsynced_lyrics)
tags = dl.get_tags_song(webplayback, lyrics_unsynced)
final_location = dl.get_final_location(tags)
cover_location = dl.get_cover_location_song(final_location)
lrc_location = dl.get_lrc_location(final_location)
@ -421,13 +421,13 @@ def main(
else:
logger.debug(f'Saving cover to "{cover_location}"')
dl.save_cover(cover_location, cover_url)
if no_lrc or not synced_lyrics:
if no_lrc or not lyrics_synced:
pass
elif lrc_location.exists() and not overwrite:
logger.warning(f'"{lrc_location}" already exists, skipping')
else:
logger.debug(f'Saving synced lyrics to "{lrc_location}"')
dl.make_lrc(lrc_location, synced_lyrics)
dl.make_lrc(lrc_location, lyrics_synced)
if track["type"] == "music-videos":
if (
not disable_music_video_album_skip

View File

@ -115,7 +115,7 @@ class Downloader:
self.cdm = Cdm.from_device(Device.load(self.wvd_location))
self.cdm_session = self.cdm.open()
def get_download_queue(self, url):
def get_download_queue(self, url: str) -> tuple[str, list[dict]]:
download_queue = []
track_id = url.split("/")[-1].split("i=")[-1].split("&")[0].split("?")[0]
response = self.session.get(
@ -137,7 +137,7 @@ class Downloader:
raise Exception("Criteria not met")
return response["type"], download_queue
def get_webplayback(self, track_id):
def get_webplayback(self, track_id: str) -> dict:
response = self.session.post(
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback",
json={
@ -147,12 +147,12 @@ class Downloader:
).json()["songList"][0]
return response
def get_stream_url_song(self, webplayback):
def get_stream_url_song(self, webplayback: dict) -> str:
return next(
i for i in webplayback["assets"] if i["flavor"] == self.songs_flavor
)["URL"]
def get_stream_url_music_video(self, webplayback):
def get_stream_url_music_video(self, webplayback: dict) -> tuple[str, str]:
ydl = YoutubeDL(
{
"allow_unplayable_formats": True,
@ -202,31 +202,31 @@ class Downloader:
)
return stream_url_video, stream_url_audio
def get_encrypted_location_video(self, track_id):
def get_encrypted_location_video(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_encrypted_video.mp4"
def get_encrypted_location_audio(self, track_id):
def get_encrypted_location_audio(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_encrypted_audio.m4a"
def get_decrypted_location_video(self, track_id):
def get_decrypted_location_video(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_decrypted_video.mp4"
def get_decrypted_location_audio(self, track_id):
def get_decrypted_location_audio(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_decrypted_audio.m4a"
def get_fixed_location(self, track_id, file_extension):
def get_fixed_location(self, track_id: str, file_extension: str) -> Path:
return self.temp_path / f"{track_id}_fixed{file_extension}"
def get_cover_location_song(self, final_location):
def get_cover_location_song(self, final_location: Path) -> Path:
return final_location.parent / f"Cover.{self.cover_format}"
def get_cover_location_music_video(self, final_location):
def get_cover_location_music_video(self, final_location: Path) -> Path:
return final_location.with_suffix(f".{self.cover_format}")
def get_lrc_location(self, final_location):
def get_lrc_location(self, final_location: Path) -> Path:
return final_location.with_suffix(".lrc")
def download_yt_dlp(self, encrypted_location, stream_url):
def download_yt_dlp(self, encrypted_location: Path, stream_url: str) -> None:
with YoutubeDL(
{
"quiet": True,
@ -238,7 +238,7 @@ class Downloader:
) as ydl:
ydl.download(stream_url)
def download_nm3u8dlre(self, encrypted_location, stream_url):
def download_nm3u8dlre(self, encrypted_location: Path, stream_url: str) -> None:
subprocess.run(
[
self.nm3u8dlre_location,
@ -259,7 +259,7 @@ class Downloader:
check=True,
)
def get_license_b64(self, challenge, track_uri, track_id):
def get_license_b64(self, challenge: str, track_uri: str, track_id: str) -> str:
return self.session.post(
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense",
json={
@ -272,7 +272,7 @@ class Downloader:
},
).json()["license"]
def get_decryption_key_music_video(self, stream_url, track_id):
def get_decryption_key_music_video(self, stream_url: str, track_id: str) -> str:
playlist = m3u8.load(stream_url)
track_uri = next(
i
@ -289,7 +289,7 @@ class Downloader:
i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT"
).key.hex()
def get_decryption_key_song(self, stream_url, track_id):
def get_decryption_key_song(self, stream_url: str, track_id: str) -> str:
track_uri = m3u8.load(stream_url).keys[0].uri
widevine_pssh_data = WidevinePsshData()
widevine_pssh_data.algorithm = 1
@ -304,7 +304,7 @@ class Downloader:
i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT"
).key.hex()
def get_synced_lyrics_lrc_timestamp(self, ttml_timestamp):
def get_lyrics_synced_lrc_timestamp(self, ttml_timestamp: str) -> str:
mins = int(ttml_timestamp.split(":")[-2]) if ":" in ttml_timestamp else 0
secs, ms = str(
float(ttml_timestamp.split(":")[-1])
@ -324,7 +324,7 @@ class Downloader:
)
return lrc_timestamp.strftime("%M:%S.%f")[:-4]
def get_lyrics(self, track_id):
def get_lyrics(self, track_id: str) -> tuple[str, str]:
try:
lyrics_ttml = ElementTree.fromstring(
self.session.get(
@ -333,28 +333,28 @@ class Downloader:
)
except:
return None, None
unsynced_lyrics = ""
synced_lyrics = ""
lyrics_unsynced = ""
lyrics_synced = ""
for div in lyrics_ttml.iter("{http://www.w3.org/ns/ttml}div"):
for p in div.iter("{http://www.w3.org/ns/ttml}p"):
if p.attrib.get("begin"):
synced_lyrics += f'[{self.get_synced_lyrics_lrc_timestamp(p.attrib.get("begin"))}]{p.text}\n'
lyrics_synced += f'[{self.get_lyrics_synced_lrc_timestamp(p.attrib.get("begin"))}]{p.text}\n'
if p.text is not None:
unsynced_lyrics += p.text + "\n"
unsynced_lyrics += "\n"
return unsynced_lyrics[:-2], synced_lyrics
lyrics_unsynced += p.text + "\n"
lyrics_unsynced += "\n"
return lyrics_unsynced[:-2], lyrics_synced
def get_cover_url(self, webplayback):
def get_cover_url(self, webplayback: dict) -> str:
return (
webplayback["artwork-urls"]["default"]["url"].rsplit("/", 1)[0]
+ f"/{self.cover_size}x{self.cover_size}bb.{self.cover_format}"
)
@functools.lru_cache()
def get_cover(self, cover_url):
def get_cover(self, cover_url: str) -> bytes:
return requests.get(cover_url).content
def get_tags_song(self, webplayback, unsynced_lyrics):
def get_tags_song(self, webplayback: dict, lyrics_unsynced: str) -> dict:
flavor = next(
i for i in webplayback["assets"] if i["flavor"] == self.songs_flavor
)
@ -381,7 +381,7 @@ class Downloader:
"gapless": metadata["gapless"],
"genre": metadata["genre"],
"genre_id": metadata["genreId"],
"lyrics": unsynced_lyrics if unsynced_lyrics else None,
"lyrics": lyrics_unsynced if lyrics_unsynced else None,
"media_type": 1,
"rating": metadata["explicit"],
"storefront": metadata["s"],
@ -394,7 +394,7 @@ class Downloader:
}
return tags
def get_tags_music_video(self, track_id):
def get_tags_music_video(self, track_id: str) -> dict:
metadata = requests.get(
f"https://itunes.apple.com/lookup",
params={
@ -438,7 +438,7 @@ class Downloader:
tags["track_total"] = metadata[0]["trackCount"]
return tags
def get_sanitized_string(self, dirty_string, is_folder):
def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str:
dirty_string = re.sub(r'[\\/:*?"<>|;]', "_", dirty_string)
if is_folder:
dirty_string = dirty_string[: self.truncate]
@ -449,7 +449,7 @@ class Downloader:
dirty_string = dirty_string[: self.truncate - 4]
return dirty_string.strip()
def get_final_location(self, tags):
def get_final_location(self, tags: dict) -> Path:
if "album" in tags:
final_location_folder = (
self.template_folder_compilation.split("/")
@ -480,7 +480,9 @@ class Downloader:
*final_location_file
)
def decrypt(self, encrypted_location, decrypted_location, decryption_key):
def decrypt(
self, encrypted_location: Path, decrypted_location: Path, decryption_key: str
) -> None:
subprocess.run(
[
self.mp4decrypt_location,
@ -491,7 +493,7 @@ class Downloader:
],
)
def fixup_song_mp4box(self, decrypted_location, fixed_location):
def fixup_song_mp4box(self, decrypted_location: Path, fixed_location: Path) -> None:
subprocess.run(
[
self.mp4box_location,
@ -506,7 +508,10 @@ class Downloader:
)
def fixup_music_video_mp4box(
self, decrypted_location_audio, decrypted_location_video, fixed_location
self,
decrypted_location_audio: Path,
decrypted_location_video: Path,
fixed_location: Path,
):
subprocess.run(
[
@ -524,7 +529,9 @@ class Downloader:
check=True,
)
def fixup_song_ffmpeg(self, encrypted_location, decryption_key, fixed_location):
def fixup_song_ffmpeg(
self, encrypted_location: Path, decryption_key: str, fixed_location: Path
) -> None:
subprocess.run(
[
self.ffmpeg_location,
@ -545,8 +552,11 @@ class Downloader:
)
def fixup_music_video_ffmpeg(
self, decrypted_location_video, decrypted_location_audio, fixed_location
):
self,
decrypted_location_video: Path,
decrypted_location_audio: Path,
fixed_location: Path,
) -> None:
subprocess.run(
[
self.ffmpeg_location,
@ -570,7 +580,7 @@ class Downloader:
check=True,
)
def apply_tags(self, fixed_location, tags, cover_url):
def apply_tags(self, fixed_location: Path, tags: dict, cover_url: str) -> None:
mp4_tags = {
v: [tags[k]]
for k, v in MP4_TAGS_MAP.items()
@ -606,15 +616,17 @@ class Downloader:
mp4.update(mp4_tags)
mp4.save()
def move_to_final_location(self, fixed_location, final_location):
def move_to_final_location(
self, fixed_location: Path, final_location: Path
) -> None:
final_location.parent.mkdir(parents=True, exist_ok=True)
shutil.move(fixed_location, final_location)
def save_cover(self, cover_location, cover_url):
def save_cover(self, cover_location: Path, cover_url: str) -> None:
with open(cover_location, "wb") as f:
f.write(self.get_cover(cover_url))
def make_lrc(self, lrc_location, synced_lyrics):
def make_lrc(self, lrc_location: Path, lyrics_synced: str) -> None:
lrc_location.parent.mkdir(parents=True, exist_ok=True)
with open(lrc_location, "w", encoding="utf8") as f:
f.write(synced_lyrics)
f.write(lyrics_synced)