Merge pull request #91 from glomatico/dev

Dev
This commit is contained in:
Rafael Moraes 2024-04-08 15:34:29 -03:00 committed by GitHub
commit 80c2afde63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 2145 additions and 1166 deletions

185
README.md
View File

@ -1,36 +1,37 @@
# gamdl - Glomatico's Apple Music Downloader
A Python script to download Apple Music songs/music videos/albums/playlists. This is a rework of https://github.com/loveyoursupport/AppleMusic-Downloader/tree/661a274d62586b521feec5a7de6bee0e230fdb7d.
# Glomatico's Apple Music Downloader
A Python script to download Apple Music songs/music videos/albums/playlists/post videos.
## Features
* Download songs in 256kbps AAC or in 64kbps HE-AAC
* Download songs in AAC/Spatial AAC/Dolby Atmos/ALAC*
* Download music videos up to 4K
* Download synced lyrics
* Download synced lyrics in LRC, SRT or TTML
* Choose between FFmpeg and MP4Box for remuxing
* Choose between yt-dlp and N_m3u8DL-RE for downloading
* Highly customizable
## Prerequisites
* Python 3.8 or higher
* The cookies file of your Apple Music account (requires an active subscription)
* You can get your cookies by using one of the following extensions on your browser of choice at the Apple Music website with your account signed in:
* Firefox: https://addons.mozilla.org/addon/export-cookies-txt
* Chromium based browsers: https://chrome.google.com/webstore/detail/gdocmgbfkjnnpapoeobnolbbkoibbcif
* FFmpeg on your system PATH
* Older versions of FFmpeg may not work.
* Up to date binaries can be obtained from the links below:
* Windows: https://github.com/AnimMouse/ffmpeg-stable-autobuild/releases
* Linux: https://johnvansickle.com/ffmpeg/
* (Optional) mp4decrypt on your system PATH
* Required to download music videos and songs in non-legacy formats.
* Binaries can be obtained from here: https://www.bento4.com/downloads/.
## Installation
1. Install Python 3.7 or higher
2. Add [FFmpeg](https://ffmpeg.org/download.html) and [mp4decrypt](https://www.bento4.com/downloads/) to PATH
* mp4decrypt is only needed if you want to download music videos
3. Place your cookies in the same folder that you will run gamdl as `cookies.txt`
* You can export your cookies by using this Google Chrome extension on Apple Music website: https://chrome.google.com/webstore/detail/open-cookiestxt/gdocmgbfkjnnpapoeobnolbbkoibbcif. Make sure to be logged in.
4. Place your .wvd file in the same folder that you will run gamdl as `device.wvd`
* To get a .wvd file, you can use [dumper](https://github.com/wvdumper/dumper) to dump a L3 CDM from an Android device. Once you have the L3 CDM, use pywidevine to create the .wvd file from it.
1. Install pywidevine with pip
```bash
pip install pywidevine pyyaml
```
2. Create the .wvd file
```bash
pywidevine create-device -t ANDROID -l 3 -k private_key.pem -c client_id.bin -o .
```
5. Install gamdl using pip
1. Install the package `gamdl` using pip
```bash
pip install gamdl
```
2. Place your cookies in the same directory you will run the script from and name it as `cookies.txt`
## Examples
## Usage
* Download a song
```bash
gamdl "https://music.apple.com/us/album/never-gonna-give-you-up-2022-remaster/1626265761?i=1626265765"
@ -41,45 +42,44 @@ A Python script to download Apple Music songs/music videos/albums/playlists. Thi
```
## Configuration
You can configure gamdl by using the command line arguments, environment variables, or the config file. The config file is created automatically when you run gamdl for the first time at `~/.gamdl/config.json` on Linux and `%USERPROFILE%\.gamdl\config.json` on Windows. Environment variables are prefixed with `GAMDL_` and are the capitalized config file key for their respective option. Config file values and environment variables can be overridden using command line arguments.
| Command line argument / Config file key | Description | Default value |
| --------------------------------------------------------------- | ---------------------------------------------------------------------- | ---------------------------------- |
| `-f`, `--final-path` / `final_path` | Path where the downloaded files will be saved. | `./Apple Music` |
| `-t`, `--temp-path` / `temp_path` | Path where the temporary files will be saved. | `./temp` |
| `-c`, `--cookies-location` / `cookies_location` | Location of the cookies file. | `./cookies.txt` |
| `-w`, `--wvd-location` / `wvd_location` | Location of the .wvd file. | `./device.wvd` |
| `--ffmpeg-location` / `ffmpeg_location` | Location of the FFmpeg binary. | `ffmpeg` |
| `--mp4box-location` / `mp4box_location` | Location of the MP4Box binary. | `MP4Box` |
| `--mp4decrypt-location` / `mp4decrypt_location` | Location of the mp4decrypt binary. | `mp4decrypt` |
| `--nm3u8dlre-location` / `nm3u8dlre_location` | Location of the N_m3u8DL-RE binary. | `N_m3u8DL-RE` |
| `--config-location` / - | Location of the config file. | `<home_folder>/.gamdl/config.json` |
| `--template-folder-album` / `template_folder_album` | Template of the album folders as a format string. | `{album_artist}/{album}` |
| `--template-folder-compilation` / `template_folder_compilation` | Template of the compilation album folders as a format string. | `Compilations/{album}` |
| `--template-file-single-disc` / `template_file_single_disc` | Template of the track files for single-disc albums as a format string. | `{track:02d} {title}` |
| `--template-file-multi-disc` / `template_file_multi_disc` | Template of the track files for multi-disc albums as a format string. | `{disc}-{track:02d} {title}` |
| `--template-folder-music-video` / `template_folder_music_video` | Template of the music video folders as a format string. | `{artist}/Unknown Album` |
| `--template-file-music-video` / `template_file_music_video` | Template of the music video files as a format string. | `{title}` |
| `--cover-size` / `cover_size` | Size of the cover. | `1200` |
| `--template-date` / `template_date` | Template of the tagged date as a string with format codes. | `%Y-%m-%dT%H:%M:%SZ` |
| `--cover-format` / `cover_format` | Format of the cover. | `jpg` |
| `--remux-mode` / `remux_mode` | Remux mode. | `ffmpeg` |
| `--download-mode` / `download_mode` | Download mode. | `ytdlp` |
| `-e`, `--exclude-tags` / `exclude_tags` | List of tags to exclude from file tagging separated by commas. | `null` |
| `--truncate` / `truncate` | Maximum length of the file/folder names. | `40` |
| `-l`, `--log-level` / `log_level` | Log level. | `INFO` |
| `--prefer-hevc` / `prefer_hevc` | Prefer HEVC over AVC when downloading music videos. | `false` |
| `--prefer-account-language` / `prefer_account_language` | Prefer the language associated with the account rather than English. | `false` |
| `--ask-video-format` / `ask_video_format` | Ask for the video format when downloading music videos. | `false` |
| `--disable-music-video-skip` / `disable_music_video_skip` | Don't skip downloading music videos in albums/playlists. | `false` |
| `-l`, `--lrc-only` / `lrc_only` | Download only the synced lyrics. | `false` |
| `-n`, `--no-lrc` / `no_lrc` | Don't download the synced lyrics. | `false` |
| `-s`, `--save-cover` / `save_cover` | Save cover as a separate file. | `false` |
| `--songs-heaac` / `songs_heaac` | Download songs in HE-AAC 64kbps. | `false` |
| `-o`, `--overwrite` / `overwrite` | Overwrite existing files. | `false` |
| `--print-exceptions` / `print_exceptions` | Print exceptions. | `false` |
| `-u`, `--url-txt` / - | Read URLs as location of text files containing URLs. | `false` |
| `-n`, `--no-config-file` / - | Don't use the config file. | `false` |
You can configure gamdl by using the command line arguments or the config file. The config file is created automatically when you run gamdl for the first time at `~/.gamdl/config.json` on Linux and `%USERPROFILE%\.gamdl\config.json` on Windows. Config file values can be overridden using command line arguments.
| Command line argument / Config file key | Description | Default value |
| --------------------------------------------------------------- | ------------------------------------------------------------------ | -------------------------------------------- |
| `--disable-music-video-skip` / `disable_music_video_skip` | Don't skip downloading music videos in albums/playlists. | `false` |
| `--save-cover`, `-s` / `save_cover` | Save cover as a separate file. | `false` |
| `--overwrite` / `overwrite` | Overwrite existing files. | `false` |
| `--read-urls-as-txt`, `-r` / - | Interpret URLs as paths to text files containing URLs. | `false` |
| `--synced-lyrics-only` / `synced_lyrics_only` | Download only the synced lyrics. | `false` |
| `--no-synced-lyrics` / `no_synced_lyrics` | Don't download the synced lyrics. | `false` |
| `--config-path` / - | Path to config file. | `<home>/.spotify-web-downloader/config.json` |
| `--log-level` / `log_level` | Log level. | `INFO` |
| `--print-exceptions` / `print_exceptions` | Print exceptions. | `false` |
| `--cookies-path`, `-c` / `cookies_path` | Path to .txt cookies file. | `./cookies.txt` |
| `--output-path`, `-o` / `output_path` | Path to output directory. | `./Apple Music` |
| `--temp-path` / `temp_path` | Path to temporary directory. | `./temp` |
| `--wvd-path` / `wvd_path` | Path to .wvd file. | `null` |
| `--nm3u8dlre-path` / `nm3u8dlre_path` | Path to N_m3u8DL-RE binary. | `N_m3u8dl-RE` |
| `--mp4decrypt-path` / `mp4decrypt_path` | Path to mp4decrypt binary. | `mp4decrypt` |
| `--ffmpeg-path` / `ffmpeg_path` | Path to FFmpeg binary. | `ffmpeg` |
| `--mp4box-path` / `mp4box_path` | Path to MP4Box binary. | `MP4Box` |
| `--download-mode` / `download_mode` | Download mode. | `ytdlp` |
| `--remux-mode` / `remux_mode` | Remux mode. | `ffmpeg` |
| `--cover-format` / `cover_format` | Cover format. | `jpg` |
| `--template-folder-album` / `template_folder_album` | Template folder for tracks that are part of an album. | `{album_artist}/{album}` |
| `--template-folder-compilation` / `template_folder_compilation` | Template folder for tracks that are part of a compilation album. | `Compilations/{album}` |
| `--template-file-single-disc` / `template_file_single_disc` | Template file for the tracks that are part of a single-disc album. | `{track:02d} {title}` |
| `--template-file-multi-disc` / `template_file_multi_disc` | Template file for the tracks that are part of a multi-disc album. | `{disc}-{track:02d} {title}` |
| `--template-folder-no-album` / `template_folder_no_album` | Template folder for the tracks that are not part of an album. | `{artist}/Unknown Album` |
| `--template-file-no-album` / `template_file_no_album` | Template file for the tracks that are not part of an album. | `{title}` |
| `--template-date` / `template_date` | Date tag template. | `%Y-%m-%dT%H:%M:%SZ` |
| `--exclude-tags` / `exclude_tags` | Comma-separated tags to exclude. | `null` |
| `--cover-size` / `cover_size` | Cover size. | `1200` |
| `--truncate` / `truncate` | Maximum length of the file/folder names. | `40` |
| `--codec-song` / `codec_song` | Song codec. | `aac-legacy` |
| `--synced-lyrics-format` / `synced_lyrics_format` | Synced lyrics format. | `lrc` |
| `--codec-music-video` / `codec_music_video` | Music video codec. | `h264-best` |
| `--quality-post` / `quality_post` | Post video quality. | `best` |
| `--no-config-file`, `-n` / - | Do not use a config file. | `false` |
### Tags variables
The following variables can be used in the template folders/files and/or in the `exclude_tags` list:
@ -113,18 +113,17 @@ The following variables can be used in the template folders/files and/or in the
* `track`
* `track_total`
* `xid`
### Remux mode
### Remux modes
The following remux modes are available:
* `ffmpeg`
* Can decrypt and remux songs but can't decrypt music videos by itself
* Decryption may not work on older versions of FFmpeg
* Can be used without mp4decrypt only for songs and when using legacy song codecs
* `mp4box`
* Requires mp4decrypt
* Doesn't convert closed captions in music videos that have them
* Can be obtained from here: https://gpac.wp.imt.fr/downloads
### Download mode
### Download modes
The following download modes are available:
* `ytdlp`
* `nm3u8dlre`
@ -132,17 +131,47 @@ The following download modes are available:
* Requires FFmpeg
* Can be obtained from here: https://github.com/nilaoda/N_m3u8DL-RE/releases
## Music videos quality
Music videos will be downloaded in the highest quality available by default. The available qualities are:
* AVC 1080p 10mbps, AAC 256kbps
* AVC 1080p 6.5mbps, AAC 256kbps
* AVC 720p 4mbps, AAC 256kbps
* AVC 576p 2mbps, AAC 256kbps
* AVC 480p 1.5mbps, AAC 256kbps
* AVC 360p 1mbps, AAC 256kbps
By enabling the `prefer_hevc` option, music videos will be downloaded in the highest HEVC quality available. The available qualities are:
* HEVC 4K 20mbps, AAC 256kbps
* HEVC 4K 12mbps, AAC 256kbps
### Song codecs
The following codecs are available:
* `aac-legacy`
* `aac-he-legacy`
* `aac`
* `aac-he`
* `aac-binaural`
* `aac-downmix`
* `aac-he-binaural`
* `aac-he-downmix`
* `alac`
* `atmos`
**Support for non-legacy codecs are not guaranteed, as most of the songs cannot be decrypted when using non-legacy codecs.**
### Music videos codecs
The following codecs are available:
* `h264-best` (with AAC 256kbps, up to 1080p)
* `h265-best` (With AAC 256kpbs, up to 2160p)
* `ask`
* When using this option, the script will ask you which audio and video codec to use.
### Post videos/extra videos qualities
The following qualities are available:
* `best` (with AAC 256kbps, up to 1080p)
* `ask`
* When using this option, the script will ask you which video quality to use.
Post videos doesn't require remuxing and are limited to `ytdlp` download mode.
### Synced lyrics formats
The following synced lyrics formats are available:
* `lrc`
* `srt`
* `ttml`
* Native format for Apple Music synced lyrics.
* Highly unsupported by media players.
### Cover formats
The following cover formats are available:
* `jpg`
* `png`
Enable `ask_video_format` to select a custom audio/video format.

View File

@ -1 +1 @@
__version__ = "1.9.11"
__version__ = "2.0"

239
gamdl/apple_music_api.py Normal file
View File

@ -0,0 +1,239 @@
from __future__ import annotations
import functools
import re
import time
from http.cookiejar import MozillaCookieJar
from pathlib import Path
import requests
class AppleMusicApi:
APPLE_MUSIC_HOMEPAGE_URL = "https://beta.music.apple.com"
AMP_API_URL = "https://amp-api.music.apple.com"
WEBPLAYBACK_API_URL = (
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
)
LICENSE_API_URL = "https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense"
WAIT_TIME = 2
def __init__(
self,
cookies_path: Path | None = Path("./cookies.txt"),
storefront: None | str = None,
language: str = "en-US",
):
self.cookies_path = cookies_path
self.storefront = storefront
self.language = language
self._set_session()
def _set_session(self):
self.session = requests.Session()
if self.cookies_path:
cookies = MozillaCookieJar(self.cookies_path)
cookies.load(ignore_discard=True, ignore_expires=True)
self.session.cookies.update(cookies)
self.storefront = self.session.cookies.get_dict()["itua"]
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"content-type": "application/json",
"Media-User-Token": self.session.cookies.get_dict().get(
"media-user-token", ""
),
"x-apple-renewal": "true",
"DNT": "1",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"origin": self.APPLE_MUSIC_HOMEPAGE_URL,
}
)
home_page = self.session.get(self.APPLE_MUSIC_HOMEPAGE_URL).text
index_js_uri = re.search(
r"/(assets/index-legacy-[^/]+\.js)",
home_page,
).group(1)
index_js_page = self.session.get(
f"{self.APPLE_MUSIC_HOMEPAGE_URL}/{index_js_uri}"
).text
token = re.search('(?=eyJh)(.*?)(?=")', index_js_page).group(1)
self.session.headers.update({"authorization": f"Bearer {token}"})
self.session.params = {"l": self.language}
@staticmethod
def _raise_response_exception(response: requests.Response):
raise Exception(
f"Request failed with status code {response.status_code}: {response.text}"
)
def _check_amp_api_response(self, response: requests.Response):
try:
response.raise_for_status()
response_dict = response.json()
assert response_dict.get("data")
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
self._raise_response_exception(response)
def get_song(
self,
song_id: str,
extend: str = "extendedAssetUrls",
include: str = "lyrics,albums",
) -> dict:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/songs/{song_id}",
params={
"include": include,
"extend": extend,
},
)
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_music_video(
self,
music_video_id: str,
include: str = "albums",
) -> dict:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/music-videos/{music_video_id}",
params={
"include": include,
},
)
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_post(
self,
post_id: str,
) -> dict:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/uploaded-videos/{post_id}"
)
self._check_amp_api_response(response)
return response.json()["data"][0]
@functools.lru_cache()
def get_album(
self,
album_id: str,
extend: str = "extendedAssetUrls",
) -> dict:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/albums/{album_id}",
params={
"extend": extend,
},
)
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_playlist(
self,
playlist_id: str,
is_library: bool = False,
limit_tracks: int = 300,
extend: str = "extendedAssetUrls",
full_playlist: bool = True,
) -> dict:
response = self.session.get(
f"{self.AMP_API_URL}/v1/{'me' if is_library else 'catalog'}/{self.storefront}/playlists/{playlist_id}",
params={
"extend": extend,
"limit[tracks]": limit_tracks,
},
)
self._check_amp_api_response(response)
playlist = response.json()["data"][0]
if full_playlist:
playlist = self._extend_playlists_tracks(playlist, limit_tracks)
return playlist
def _extend_playlists_tracks(
self,
playlist: dict,
limit_tracks: int,
) -> dict:
playlist_next_uri = playlist["relationships"]["tracks"].get("next")
while playlist_next_uri:
playlist_next = self._get_playlist_next(playlist_next_uri, limit_tracks)
playlist["relationships"]["tracks"]["data"].extend(playlist_next["data"])
playlist_next_uri = playlist_next.get("next")
time.sleep(self.WAIT_TIME)
return playlist
def _get_playlist_next(self, playlist_next_uri: str, limit_tracks: int) -> dict:
response = self.session.get(
self.AMP_API_URL + playlist_next_uri,
params={
"limit[tracks]": limit_tracks,
},
)
self._check_amp_api_response(response)
return response.json()
def get_webplayback(
self,
track_id: str,
) -> dict:
response = self.session.post(
self.WEBPLAYBACK_API_URL,
json={
"salableAdamId": track_id,
"language": self.language,
},
)
try:
response.raise_for_status()
response_dict = response.json()
webplayback = response_dict.get("songList")
assert webplayback
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
self._raise_response_exception(response)
return webplayback[0]
def get_widevine_license(
self,
track_id: str,
track_uri: str,
challenge: str,
) -> str:
response = self.session.post(
self.LICENSE_API_URL,
json={
"challenge": challenge,
"key-system": "com.widevine.alpha",
"uri": track_uri,
"adamId": track_id,
"isLibrary": False,
"user-initiated": True,
},
)
try:
response.raise_for_status()
response_dict = response.json()
widevine_license = response_dict.get("license")
assert widevine_license
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
self._raise_response_exception(response)
return widevine_license

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,5 @@
from gamdl.enums import MusicVideoCodec, SongCodec, SyncedLyricsFormat
STOREFRONT_IDS = {
"AE": "143481-2,32",
"AG": "143540-2,32",
@ -182,10 +184,33 @@ MP4_TAGS_MAP = {
"xid": "xid ",
}
SONG_CODEC_REGEX_MAP = {
SongCodec.AAC: r"audio-stereo-\d+",
SongCodec.AAC_HE: r"audio-HE-stereo-\d+",
SongCodec.AAC_BINAURAL: r"audio-stereo-\d+-binaural",
SongCodec.AAC_DOWNMIX: r"audio-stereo-\d+-downmix",
SongCodec.AAC_HE_BINAURAL: r"audio-HE-stereo-\d+-binaural",
SongCodec.AAC_HE_DOWNMIX: r"audio-HE-stereo-\d+-downmix",
SongCodec.ALAC: r"audio-alac-.*",
SongCodec.ATMOS: r"audio-atmos-.*",
}
MUSIC_VIDEO_CODEC_MAP = {
MusicVideoCodec.H264_BEST: "avc1",
MusicVideoCodec.H265_BEST: "hvc1",
}
SYNCED_LYRICS_FILE_EXTENSION_MAP = {
SyncedLyricsFormat.LRC: ".lrc",
SyncedLyricsFormat.SRT: ".srt",
SyncedLyricsFormat.TTML: ".ttml",
}
EXCLUDED_CONFIG_FILE_PARAMS = (
"urls",
"config_location",
"url_txt",
"config_path",
"read_urls_as_txt",
"no_config_file",
"version",
"help",
@ -193,4 +218,4 @@ EXCLUDED_CONFIG_FILE_PARAMS = (
X_NOT_FOUND_STRING = '{} not found at "{}"'
AMP_API_HOSTNAME = "https://amp-api.music.apple.com"
AMP_API_HOSTNAME = "https://amp-api.music.apple.com"

View File

@ -1,541 +1,223 @@
from __future__ import annotations
import base64
import datetime
import functools
import re
import shutil
import subprocess
from http.cookiejar import MozillaCookieJar
from pathlib import Path
from time import sleep
from xml.etree import ElementTree
import ciso8601
import m3u8
import requests
from mutagen.mp4 import MP4, MP4Cover
from pywidevine import PSSH, Cdm, Device
from pywidevine.license_protocol_pb2 import WidevinePsshData
from yt_dlp import YoutubeDL
from gamdl.constants import AMP_API_HOSTNAME, MP4_TAGS_MAP, STOREFRONT_IDS
from .apple_music_api import AppleMusicApi
from .constants import MP4_TAGS_MAP
from .enums import CoverFormat, DownloadMode, RemuxMode
from .hardcoded_wvd import HARDCODED_WVD
from .itunes_api import ItunesApi
from .models import DownloadQueueItem, UrlInfo
class Downloader:
ILLEGAL_CHARACTERS_REGEX = r'[\\/:*?"<>|;]'
def __init__(
self,
final_path: Path = None,
temp_path: Path = None,
cookies_location: Path = None,
wvd_location: Path = None,
ffmpeg_location: str = None,
mp4box_location: str = None,
mp4decrypt_location: str = None,
nm3u8dlre_location: str = None,
template_folder_album: str = None,
template_folder_compilation: str = None,
template_file_single_disc: str = None,
template_file_multi_disc: str = None,
template_folder_music_video: str = None,
template_file_music_video: str = None,
template_date: str = None,
cover_size: int = None,
cover_format: str = None,
apple_music_api: AppleMusicApi,
itunes_api: ItunesApi,
output_path: Path = Path("./Apple Music"),
temp_path: Path = Path("./temp"),
wvd_path: Path = None,
nm3u8dlre_path: str = "N_m3u8dl-RE",
mp4decrypt_path: str = "mp4decrypt",
ffmpeg_path: str = "ffmpeg",
mp4box_path: str = "MP4Box",
download_mode: DownloadMode = DownloadMode.YTDLP,
remux_mode: RemuxMode = RemuxMode.FFMPEG,
cover_format: CoverFormat = CoverFormat.JPG,
template_folder_album: str = "{album_artist}/{album}",
template_folder_compilation: str = "Compilations/{album}",
template_file_single_disc: str = "{track:02d} {title}",
template_file_multi_disc: str = "{disc}-{track:02d} {title}",
template_folder_no_album: str = "{artist}/Unknown Album",
template_file_no_album: str = "{title}",
template_date: str = "%Y-%m-%dT%H:%M:%SZ",
exclude_tags: str = None,
truncate: int = None,
prefer_hevc: bool = None,
prefer_account_language: bool = None,
ask_video_format: bool = None,
songs_heaac: bool = None,
**kwargs,
cover_size: int = 1200,
truncate: int = 40,
no_progress: bool = False,
):
self.final_path = final_path
self.apple_music_api = apple_music_api
self.itunes_api = itunes_api
self.output_path = output_path
self.temp_path = temp_path
self.cookies_location = cookies_location
self.wvd_location = wvd_location
self.ffmpeg_location = (
shutil.which(ffmpeg_location) if ffmpeg_location else None
)
self.mp4box_location = (
shutil.which(mp4box_location) if mp4box_location else None
)
self.mp4decrypt_location = (
shutil.which(mp4decrypt_location) if mp4decrypt_location else None
)
self.nm3u8dlre_location = (
shutil.which(nm3u8dlre_location) if nm3u8dlre_location else None
)
self.wvd_path = wvd_path
self.nm3u8dlre_path = nm3u8dlre_path
self.mp4decrypt_path = mp4decrypt_path
self.ffmpeg_path = ffmpeg_path
self.mp4box_path = mp4box_path
self.download_mode = download_mode
self.remux_mode = remux_mode
self.cover_format = cover_format
self.template_folder_album = template_folder_album
self.template_folder_compilation = template_folder_compilation
self.template_file_single_disc = template_file_single_disc
self.template_file_multi_disc = template_file_multi_disc
self.template_folder_music_video = template_folder_music_video
self.template_file_music_video = template_file_music_video
self.template_folder_no_album = template_folder_no_album
self.template_file_no_album = template_file_no_album
self.template_date = template_date
self.exclude_tags = exclude_tags
self.cover_size = cover_size
self.cover_format = cover_format
self.exclude_tags = (
[i.lower() for i in exclude_tags.split(",")]
if exclude_tags is not None
self.truncate = truncate
self.no_progress = no_progress
self._set_binaries_path_full()
self._set_exclude_tags_list()
self._set_truncate()
def _set_binaries_path_full(self):
self.nm3u8dlre_path_full = shutil.which(self.nm3u8dlre_path)
self.ffmpeg_path_full = shutil.which(self.ffmpeg_path)
self.mp4box_path_full = shutil.which(self.mp4box_path)
self.mp4decrypt_path_full = shutil.which(self.mp4decrypt_path)
def _set_exclude_tags_list(self):
self.exclude_tags_list = (
[i.lower() for i in self.exclude_tags.split(",")]
if self.exclude_tags is not None
else []
)
self.truncate = None if truncate is not None and truncate < 4 else truncate
self.prefer_hevc = prefer_hevc
self.prefer_account_language = prefer_account_language
self.ask_video_format = ask_video_format
self.songs_flavor = "32:ctrp64" if songs_heaac else "28:ctrp256"
def setup_session(self) -> None:
cookies = MozillaCookieJar(self.cookies_location)
cookies.load(ignore_discard=True, ignore_expires=True)
self.session = requests.Session()
self.session.cookies.update(cookies)
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"content-type": "application/json",
"Media-User-Token": self.session.cookies.get_dict()["media-user-token"],
"x-apple-renewal": "true",
"DNT": "1",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"origin": "https://beta.music.apple.com",
}
)
home_page = self.session.get("https://beta.music.apple.com").text
index_js_uri = re.search(r"/(assets/index-legacy-[^/]+\.js)", home_page).group(
1
)
index_js_page = self.session.get(
f"https://beta.music.apple.com/{index_js_uri}"
).text
token = re.search('(?=eyJh)(.*?)(?=")', index_js_page).group(1)
self.session.headers.update({"authorization": f"Bearer {token}"})
self.country = self.session.cookies.get_dict()["itua"]
self.storefront = STOREFRONT_IDS[self.country.upper()]
def _set_truncate(self):
self.truncate = None if self.truncate < 4 else self.truncate
def setup_cdm(self) -> None:
self.cdm = Cdm.from_device(Device.load(self.wvd_location))
self.cdm_session = self.cdm.open()
def set_cdm(self):
if self.wvd_path:
self.cdm = Cdm.from_device(Device.load(self.wvd_path))
else:
self.cdm = Cdm.from_device(Device.loads(HARDCODED_WVD))
def get_song(self, song_id: str) -> dict:
song_response = self.session.get(
f"{AMP_API_HOSTNAME}/v1/catalog/{self.country}/songs/{song_id}"
)
if song_response.status_code != 200:
raise Exception(f"Failed to get song: {song_response.text}")
return song_response.json()["data"][0]
def get_music_video(self, music_video_id: str) -> dict:
music_video_response = self.session.get(
f"{AMP_API_HOSTNAME}/v1/catalog/{self.country}/music-videos/{music_video_id}"
)
if music_video_response.status_code != 200:
raise Exception(f"Failed to get music video: {music_video_response.text}")
return music_video_response.json()["data"][0]
def get_album(self, album_id: str) -> dict:
album_response = self.session.get(
f"{AMP_API_HOSTNAME}/v1/catalog/{self.country}/albums/{album_id}"
)
if album_response.status_code != 200:
raise Exception(f"Failed to get album: {album_response.text}")
return album_response.json()["data"][0]
def get_playlist(self, playlist_id: str) -> dict:
playlist_response = self.session.get(
f"{AMP_API_HOSTNAME}/v1/catalog/{self.country}/playlists/{playlist_id}",
params={
"limit[tracks]": 300,
},
)
if playlist_response.status_code != 200:
raise Exception(f"Failed to get playlist: {playlist_response.text}")
return playlist_response.json()["data"][0]
def get_playlists_additional_tracks(self, next_uri) -> dict:
extending = True
additional_tracks = []
while extending:
playlist_tracks_response = self.session.get(f"{AMP_API_HOSTNAME}{next_uri}")
playlist_tracks_response_json = playlist_tracks_response.json()
extending = "next" in playlist_tracks_response_json
playlist_tracks_response_json_data = playlist_tracks_response_json["data"]
# Doing a push to the array we are going to download the songs from
additional_tracks.extend(playlist_tracks_response_json_data)
if extending:
next_uri = playlist_tracks_response_json["next"]
# I don't want to upset any kind of rate limits so lets give it a few seconds, if you have any clue on what the rate limits are adjust this to match them closely.
# 3 Seconds to be safe, we can afford it since we only need to do it once
sleep(3)
return additional_tracks
def get_download_queue(self, url: str) -> tuple[str, list[dict]]:
download_queue = []
def get_url_info(self, url: str) -> UrlInfo:
url_info = UrlInfo()
url_regex_result = re.search(
r"/([a-z]{2})/(album|playlist|song|music-video)/(.*)/([a-z]{2}\..*|[0-9]*)(?:\?i=)?([0-9a-z]*)",
r"/([a-z]{2})/(album|playlist|song|music-video|post)/([^/]*)(?:/([^/?]*))?(?:\?i=)?([0-9a-z]*)?",
url,
)
catalog_resource_type = url_regex_result.group(2)
catalog_id = url_regex_result.group(5) or url_regex_result.group(4)
if catalog_resource_type == "song" or url_regex_result.group(5):
download_queue.append(self.get_song(catalog_id))
elif catalog_resource_type == "music-video":
download_queue.append(self.get_music_video(catalog_id))
elif catalog_resource_type == "album":
url_info.storefront = url_regex_result.group(1)
url_info.type = (
"song" if url_regex_result.group(5) else url_regex_result.group(2)
)
url_info.id = (
url_regex_result.group(5)
or url_regex_result.group(4)
or url_regex_result.group(3)
)
return url_info
def get_download_queue(self, url_info: UrlInfo) -> list[DownloadQueueItem]:
return self._get_download_queue(url_info.type, url_info.id)
def _get_download_queue(self, url_type: str, id: str) -> list[DownloadQueueItem]:
download_queue = []
if url_type == "song":
download_queue.append(DownloadQueueItem(self.apple_music_api.get_song(id)))
elif url_type == "album":
album = self.apple_music_api.get_album(id)
download_queue.extend(
self.get_album(catalog_id)["relationships"]["tracks"]["data"]
DownloadQueueItem(track)
for track in album["relationships"]["tracks"]["data"]
)
elif catalog_resource_type == "playlist":
playlist = self.get_playlist(catalog_id)
tracks_response = playlist["relationships"]["tracks"]
download_queue.extend(tracks_response["data"])
if "next" not in tracks_response:
return catalog_resource_type, download_queue
elif url_type == "playlist":
download_queue.extend(
self.get_playlists_additional_tracks(tracks_response["next"])
DownloadQueueItem(track)
for track in self.apple_music_api.get_playlist(id)["relationships"][
"tracks"
]["data"]
)
return catalog_resource_type, download_queue
else:
raise Exception("Invalid URL")
return catalog_resource_type, download_queue
def get_webplayback(self, track_id: str) -> dict:
webplayback_response = self.session.post(
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback",
json={
"salableAdamId": track_id,
"language": "" if self.prefer_account_language else "en-US",
},
)
if webplayback_response.status_code != 200:
raise Exception(f"Failed to get webplayback: {webplayback_response.text}")
return webplayback_response.json()["songList"][0]
def get_stream_url_song(self, webplayback: dict) -> str:
return next(
i for i in webplayback["assets"] if i["flavor"] == self.songs_flavor
)["URL"]
def get_stream_url_music_video(self, webplayback: dict) -> tuple[str, str]:
ydl = YoutubeDL(
{
"allow_unplayable_formats": True,
"quiet": True,
"no_warnings": True,
"allowed_extractors": ["generic"],
}
)
playlist = ydl.extract_info(
webplayback["hls-playlist-url"].replace("&aec=HD", ""),
download=False,
)
if self.ask_video_format:
ydl.list_formats(playlist)
stream_url_video = None
stream_url_audio = None
while stream_url_video is None or stream_url_audio is None:
format_ids = input("Enter video and audio id: ").split()
if len(format_ids) != 2:
continue
video_id, audio_id = format_ids
matching_formats = [
i
for i in playlist["formats"]
if i["format_id"] in (video_id, audio_id)
]
stream_url_video = next(
(i["url"] for i in matching_formats if i["video_ext"] != "none"),
None,
)
stream_url_audio = next(
(i["url"] for i in matching_formats if i["audio_ext"] != "none"),
None,
)
else:
if self.prefer_hevc:
stream_url_video = playlist["formats"][-1]["url"]
else:
stream_url_video = list(
i["url"]
for i in playlist["formats"]
if i["video_ext"] != "none" and "avc1" in i["vcodec"]
)[-1]
stream_url_audio = next(
i["url"]
for i in playlist["formats"]
if "audio-stereo-256" in i["format_id"]
elif url_type == "music-video":
download_queue.append(
DownloadQueueItem(self.apple_music_api.get_music_video(id))
)
return stream_url_video, stream_url_audio
elif url_type == "post":
download_queue.append(DownloadQueueItem(self.apple_music_api.get_post(id)))
else:
raise Exception(f"Invalid url type: {url_type}")
return download_queue
def get_encrypted_location_video(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_encrypted_video.mp4"
def sanitize_date(self, date: str):
datetime_obj = ciso8601.parse_datetime(date)
return datetime_obj.strftime(self.template_date)
def get_encrypted_location_audio(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_encrypted_audio.m4a"
def get_decryption_key(self, pssh: str, track_id: str) -> str:
pssh_obj = PSSH(pssh.split(",")[-1])
cdm_session = self.cdm.open()
challenge = base64.b64encode(
self.cdm.get_license_challenge(cdm_session, pssh_obj)
).decode()
license = self.apple_music_api.get_widevine_license(
track_id,
pssh,
challenge,
)
self.cdm.parse_license(cdm_session, license)
decryption_key = next(
i for i in self.cdm.get_keys(cdm_session) if i.type == "CONTENT"
).key.hex()
self.cdm.close(cdm_session)
return decryption_key
def get_decrypted_location_video(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_decrypted_video.mp4"
def download(self, path: Path, stream_url: str):
if self.download_mode == DownloadMode.YTDLP:
self.download_ytdlp(path, stream_url)
elif self.download_mode == DownloadMode.NM3U8DLRE:
self.download_nm3u8dlre(path, stream_url)
def get_decrypted_location_audio(self, track_id: str) -> Path:
return self.temp_path / f"{track_id}_decrypted_audio.m4a"
def get_fixed_location(self, track_id: str, file_extension: str) -> Path:
return self.temp_path / f"{track_id}_fixed{file_extension}"
def get_cover_location_song(self, final_location: Path) -> Path:
return final_location.parent / f"Cover.{self.cover_format}"
def get_cover_location_music_video(self, final_location: Path) -> Path:
return final_location.with_suffix(f".{self.cover_format}")
def get_lrc_location(self, final_location: Path) -> Path:
return final_location.with_suffix(".lrc")
def download_ytdlp(self, encrypted_location: Path, stream_url: str) -> None:
def download_ytdlp(self, path: Path, stream_url: str):
with YoutubeDL(
{
"quiet": True,
"no_warnings": True,
"outtmpl": str(encrypted_location),
"outtmpl": str(path),
"allow_unplayable_formats": True,
"fixup": "never",
"allowed_extractors": ["generic"],
"noprogress": self.no_progress,
}
) as ydl:
ydl.download(stream_url)
def download_nm3u8dlre(self, encrypted_location: Path, stream_url: str) -> None:
def download_nm3u8dlre(self, path: Path, stream_url: str):
if self.no_progress:
subprocess_additional_args = {
"stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL,
}
else:
subprocess_additional_args = {}
path.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
[
self.nm3u8dlre_location,
self.nm3u8dlre_path_full,
stream_url,
"--binary-merge",
"--no-log",
"--log-level",
"off",
"--ffmpeg-binary-path",
self.ffmpeg_location,
self.ffmpeg_path_full,
"--save-name",
encrypted_location.stem,
path.stem,
"--save-dir",
encrypted_location.parent,
path.parent,
"--tmp-dir",
encrypted_location.parent,
path.parent,
],
check=True,
**subprocess_additional_args,
)
def get_license_b64(self, challenge: str, track_uri: str, track_id: str) -> str:
license_b64_response = self.session.post(
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense",
json={
"challenge": challenge,
"key-system": "com.widevine.alpha",
"uri": track_uri,
"adamId": track_id,
"isLibrary": False,
"user-initiated": True,
},
)
if license_b64_response.status_code != 200:
raise Exception(f"Failed to get license_b64: {license_b64_response.text}")
return license_b64_response.json()["license"]
def get_decryption_key_music_video(self, stream_url: str, track_id: str) -> str:
playlist = m3u8.load(stream_url)
track_uri = next(
i
for i in playlist.keys
if i.keyformat == "urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed"
).uri
pssh = PSSH(track_uri.split(",")[1])
challenge = base64.b64encode(
self.cdm.get_license_challenge(self.cdm_session, pssh)
).decode()
license_b64 = self.get_license_b64(challenge, track_uri, track_id)
self.cdm.parse_license(self.cdm_session, license_b64)
return next(
i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT"
).key.hex()
def get_decryption_key_song(self, stream_url: str, track_id: str) -> str:
track_uri = m3u8.load(stream_url).keys[0].uri
widevine_pssh_data = WidevinePsshData()
widevine_pssh_data.algorithm = 1
widevine_pssh_data.key_ids.append(base64.b64decode(track_uri.split(",")[1]))
pssh = PSSH(base64.b64encode(widevine_pssh_data.SerializeToString()).decode())
challenge = base64.b64encode(
self.cdm.get_license_challenge(self.cdm_session, pssh)
).decode()
license_b64 = self.get_license_b64(challenge, track_uri, track_id)
self.cdm.parse_license(self.cdm_session, license_b64)
return next(
i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT"
).key.hex()
def get_lyrics_synced_timestamp_lrc(self, timestamp_ttml: str) -> str:
mins_secs_ms = re.findall(r"\d+", timestamp_ttml)
ms, secs, mins = 0, 0, 0
if len(mins_secs_ms) == 2 and ":" in timestamp_ttml:
secs, mins = int(mins_secs_ms[-1]), int(mins_secs_ms[-2])
elif len(mins_secs_ms) == 1:
ms = int(mins_secs_ms[-1])
else:
secs = float(f"{mins_secs_ms[-2]}.{mins_secs_ms[-1]}")
if len(mins_secs_ms) > 2:
mins = int(mins_secs_ms[-3])
timestamp_lrc = datetime.datetime.fromtimestamp(
(mins * 60) + secs + (ms / 1000)
)
ms_new = timestamp_lrc.strftime("%f")[:-3]
if int(ms_new[-1]) >= 5:
ms = int(f"{int(ms_new[:2]) + 1}") * 10
timestamp_lrc += datetime.timedelta(milliseconds=ms) - datetime.timedelta(
microseconds=timestamp_lrc.microsecond
)
return timestamp_lrc.strftime("%M:%S.%f")[:-4]
def get_lyrics(self, track_id: str) -> tuple[str, str]:
lyrics = self.session.get(
f"{AMP_API_HOSTNAME}/v1/catalog/{self.country}/songs/{track_id}/lyrics"
).json()
if lyrics["data"][0].get("attributes") is None:
return None, None
lyrics_ttml = ElementTree.fromstring(lyrics["data"][0]["attributes"]["ttml"])
lyrics_unsynced = ""
lyrics_synced = ""
for div in lyrics_ttml.iter("{http://www.w3.org/ns/ttml}div"):
for p in div.iter("{http://www.w3.org/ns/ttml}p"):
if p.attrib.get("begin"):
lyrics_synced += f'[{self.get_lyrics_synced_timestamp_lrc(p.attrib.get("begin"))}]{p.text}\n'
if p.text is not None:
lyrics_unsynced += p.text + "\n"
lyrics_unsynced += "\n"
return lyrics_unsynced[:-2], lyrics_synced
def get_cover_url(self, webplayback: dict) -> str:
return (
webplayback["artwork-urls"]["default"]["url"].rsplit("/", 1)[0]
+ f"/{self.cover_size}x{self.cover_size}bb.{self.cover_format}"
)
@functools.lru_cache()
def get_cover(self, cover_url: str) -> bytes:
return requests.get(cover_url).content
def get_tags_song(self, webplayback: dict, lyrics_unsynced: str) -> dict:
flavor = next(
i for i in webplayback["assets"] if i["flavor"] == self.songs_flavor
)
metadata = flavor["metadata"]
tags = {
"album": metadata["playlistName"],
"album_artist": metadata["playlistArtistName"],
"album_id": int(metadata["playlistId"]),
"album_sort": metadata["sort-album"],
"artist": metadata["artistName"],
"artist_id": int(metadata["artistId"]),
"artist_sort": metadata["sort-artist"],
"comments": metadata.get("comments"),
"compilation": metadata["compilation"],
"composer": metadata.get("composerName"),
"composer_id": (
int(metadata.get("composerId")) if metadata.get("composerId") else None
),
"composer_sort": metadata.get("sort-composer"),
"copyright": metadata.get("copyright"),
"date": (
self.sanitize_date(metadata["releaseDate"], self.template_date)
if metadata.get("releaseDate")
else None
),
"disc": metadata["discNumber"],
"disc_total": metadata["discCount"],
"gapless": metadata["gapless"],
"genre": metadata["genre"],
"genre_id": metadata["genreId"],
"lyrics": lyrics_unsynced if lyrics_unsynced else None,
"media_type": 1,
"rating": metadata["explicit"],
"storefront": metadata["s"],
"title": metadata["itemName"],
"title_id": int(metadata["itemId"]),
"title_sort": metadata["sort-name"],
"track": metadata["trackNumber"],
"track_total": metadata["trackCount"],
"xid": metadata.get("xid"),
}
return tags
def get_tags_music_video(self, track_id: str) -> dict:
metadata_response = requests.get(
f"https://itunes.apple.com/lookup",
params={
"id": track_id,
"entity": "album",
"country": self.country,
"lang": "" if self.prefer_account_language else "en_US",
},
)
if metadata_response.status_code != 200:
raise Exception(f"Failed to get metadata: {metadata_response.text}")
metadata = metadata_response.json()["results"]
extra_metadata_response = requests.get(
f'https://music.apple.com/music-video/{metadata[0]["trackId"]}',
headers={"X-Apple-Store-Front": f"{self.storefront} t:music31"},
)
if extra_metadata_response.status_code != 200:
raise Exception(
f"Failed to get extra metadata: {extra_metadata_response.text}"
)
extra_metadata = extra_metadata_response.json()["storePlatformData"][
"product-dv"
]["results"][str(metadata[0]["trackId"])]
tags = {
"artist": metadata[0]["artistName"],
"artist_id": metadata[0]["artistId"],
"copyright": extra_metadata.get("copyright"),
"date": self.sanitize_date(metadata[0]["releaseDate"], self.template_date),
"genre": metadata[0]["primaryGenreName"],
"genre_id": int(extra_metadata["genres"][0]["genreId"]),
"media_type": 6,
"storefront": int(self.storefront.split("-")[0]),
"title": metadata[0]["trackCensoredName"],
"title_id": metadata[0]["trackId"],
}
if metadata[0]["trackExplicitness"] == "notExplicit":
tags["rating"] = 0
elif metadata[0]["trackExplicitness"] == "explicit":
tags["rating"] = 1
else:
tags["rating"] = 2
if len(metadata) > 1:
tags["album"] = metadata[1]["collectionCensoredName"]
tags["album_artist"] = metadata[1]["artistName"]
tags["album_id"] = metadata[1]["collectionId"]
tags["disc"] = metadata[0]["discNumber"]
tags["disc_total"] = metadata[0]["discCount"]
tags["track"] = metadata[0]["trackNumber"]
tags["track_total"] = metadata[0]["trackCount"]
return tags
def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str:
dirty_string = re.sub(r'[\\/:*?"<>|;]', "_", dirty_string)
dirty_string = re.sub(self.ILLEGAL_CHARACTERS_REGEX, "_", dirty_string)
if is_folder:
dirty_string = dirty_string[: self.truncate]
if dirty_string.endswith("."):
@ -545,197 +227,111 @@ class Downloader:
dirty_string = dirty_string[: self.truncate - 4]
return dirty_string.strip()
def get_final_location(self, tags: dict) -> Path:
def get_final_path(self, tags: dict, file_extension: str) -> Path:
if tags.get("album"):
final_location_folder = (
final_path_folder = (
self.template_folder_compilation.split("/")
if tags.get("compilation")
else self.template_folder_album.split("/")
)
final_location_file = (
final_path_file = (
self.template_file_multi_disc.split("/")
if tags["disc_total"] > 1
else self.template_file_single_disc.split("/")
)
else:
final_location_folder = self.template_folder_music_video.split("/")
final_location_file = self.template_file_music_video.split("/")
file_extension = ".m4a" if tags["media_type"] == 1 else ".m4v"
final_location_folder = [
self.get_sanitized_string(i.format(**tags), True)
for i in final_location_folder
final_path_folder = self.template_folder_no_album.split("/")
final_path_file = self.template_file_no_album.split("/")
final_path_folder = [
self.get_sanitized_string(i.format(**tags), True) for i in final_path_folder
]
final_location_file = [
final_path_file = [
self.get_sanitized_string(i.format(**tags), True)
for i in final_location_file[:-1]
for i in final_path_file[:-1]
] + [
self.get_sanitized_string(final_location_file[-1].format(**tags), False)
self.get_sanitized_string(final_path_file[-1].format(**tags), False)
+ file_extension
]
return self.final_path.joinpath(*final_location_folder).joinpath(
*final_location_file
return self.output_path.joinpath(*final_path_folder).joinpath(*final_path_file)
def get_cover_url(self, metadata: dict) -> str:
return self._get_cover_url(metadata["attributes"]["artwork"]["url"])
def _get_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
f"{self.cover_size}x{self.cover_size}bb.{self.cover_format.value}",
cover_url_template,
)
@staticmethod
def sanitize_date(date: str, template_date: str):
datetime_obj = ciso8601.parse_datetime(date)
return datetime_obj.strftime(template_date)
@functools.lru_cache()
def get_url_response_bytes(url: str) -> bytes:
return requests.get(url).content
def decrypt(
self, encrypted_location: Path, decrypted_location: Path, decryption_key: str
) -> None:
subprocess.run(
[
self.mp4decrypt_location,
encrypted_location,
"--key",
f"1:{decryption_key}",
decrypted_location,
],
check=True,
)
def fixup_song_mp4box(self, decrypted_location: Path, fixed_location: Path) -> None:
subprocess.run(
[
self.mp4box_location,
"-quiet",
"-add",
decrypted_location,
"-itags",
"artist=placeholder",
"-new",
fixed_location,
],
check=True,
)
def fixup_music_video_mp4box(
def apply_tags(
self,
decrypted_location_audio: Path,
decrypted_location_video: Path,
fixed_location: Path,
) -> None:
subprocess.run(
[
self.mp4box_location,
"-quiet",
"-add",
decrypted_location_audio,
"-add",
decrypted_location_video,
"-itags",
"artist=placeholder",
"-new",
fixed_location,
],
check=True,
)
def fixup_song_ffmpeg(
self, encrypted_location: Path, decryption_key: str, fixed_location: Path
) -> None:
subprocess.run(
[
self.ffmpeg_location,
"-loglevel",
"error",
"-y",
"-decryption_key",
decryption_key,
"-i",
encrypted_location,
"-movflags",
"+faststart",
"-c",
"copy",
fixed_location,
],
check=True,
)
def fixup_music_video_ffmpeg(
self,
decrypted_location_video: Path,
decrypted_location_audio: Path,
fixed_location: Path,
) -> None:
subprocess.run(
[
self.ffmpeg_location,
"-loglevel",
"error",
"-y",
"-i",
decrypted_location_video,
"-i",
decrypted_location_audio,
"-movflags",
"+faststart",
"-f",
"mp4",
"-c",
"copy",
"-c:s",
"mov_text",
fixed_location,
],
check=True,
)
def apply_tags(self, fixed_location: Path, tags: dict, cover_url: str) -> None:
mp4_tags = {
v: [tags[k]]
for k, v in MP4_TAGS_MAP.items()
if k not in self.exclude_tags and tags.get(k) is not None
}
if not {"disc", "disc_total"} & set(self.exclude_tags) and "disc" in tags:
mp4_tags["disk"] = [[0, 0]]
if not {"track", "track_total"} & set(self.exclude_tags) and "track" in tags:
mp4_tags["trkn"] = [[0, 0]]
if "compilation" not in self.exclude_tags and "compilation" in tags:
mp4_tags["cpil"] = tags["compilation"]
if "cover" not in self.exclude_tags:
path: Path,
tags: dict,
cover_url: str,
):
to_apply_tags = [
tag_name
for tag_name in tags.keys()
if tag_name not in self.exclude_tags_list
]
mp4_tags = {}
for tag_name in to_apply_tags:
if tag_name in ("disc", "disc_total"):
if mp4_tags.get("disk") is None:
mp4_tags["disk"] = [[0, 0]]
if tag_name == "disc":
mp4_tags["disk"][0][0] = tags[tag_name]
elif tag_name == "disc_total":
mp4_tags["disk"][0][1] = tags[tag_name]
elif tag_name in ("track", "track_total"):
if mp4_tags.get("trkn") is None:
mp4_tags["trkn"] = [[0, 0]]
if tag_name == "track":
mp4_tags["trkn"][0][0] = tags[tag_name]
elif tag_name == "track_total":
mp4_tags["trkn"][0][1] = tags[tag_name]
elif tag_name == "compilation":
mp4_tags["cpil"] = tags["compilation"]
elif tag_name == "gapless":
mp4_tags["pgap"] = tags["gapless"]
elif (
MP4_TAGS_MAP.get(tag_name) is not None
and tags.get(tag_name) is not None
):
mp4_tags[MP4_TAGS_MAP[tag_name]] = [tags[tag_name]]
if "cover" not in self.exclude_tags_list:
mp4_tags["covr"] = [
MP4Cover(
self.get_cover(cover_url),
self.get_url_response_bytes(cover_url),
imageformat=(
MP4Cover.FORMAT_JPEG
if self.cover_format == "jpg"
if self.cover_format == CoverFormat.JPG
else MP4Cover.FORMAT_PNG
),
)
]
if "disc" not in self.exclude_tags and "disc" in tags:
mp4_tags["disk"][0][0] = tags["disc"]
if "disc_total" not in self.exclude_tags and "disc_total" in tags:
mp4_tags["disk"][0][1] = tags["disc_total"]
if "gapless" not in self.exclude_tags and "gapless" in tags:
mp4_tags["pgap"] = tags["gapless"]
if "track" not in self.exclude_tags and "track" in tags:
mp4_tags["trkn"][0][0] = tags["track"]
if "track_total" not in self.exclude_tags and "track_total" in tags:
mp4_tags["trkn"][0][1] = tags["track_total"]
mp4 = MP4(fixed_location)
mp4 = MP4(path)
mp4.clear()
mp4.update(mp4_tags)
mp4.save()
def move_to_final_location(
self, fixed_location: Path, final_location: Path
) -> None:
final_location.parent.mkdir(parents=True, exist_ok=True)
shutil.move(fixed_location, final_location)
def move_to_output_path(
self,
remuxed_path: Path,
final_path: Path,
):
final_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(remuxed_path, final_path)
@functools.lru_cache()
def save_cover(self, cover_location: Path, cover_url: str) -> None:
with open(cover_location, "wb") as f:
f.write(self.get_cover(cover_url))
def save_cover(self, cover_path: Path, cover_url: str):
cover_path.write_bytes(self.get_url_response_bytes(cover_url))
def save_lrc(self, lrc_location: Path, lyrics_synced: str) -> None:
lrc_location.parent.mkdir(parents=True, exist_ok=True)
with open(lrc_location, "w", encoding="utf8") as f:
f.write(lyrics_synced)
def cleanup_temp_path(self) -> None:
def cleanup_temp_path(self):
shutil.rmtree(self.temp_path)

View File

@ -0,0 +1,302 @@
import subprocess
import urllib.parse
from pathlib import Path
import click
import m3u8
from tabulate import tabulate
from .constants import MUSIC_VIDEO_CODEC_MAP
from .downloader import Downloader
from .enums import MusicVideoCodec, RemuxMode
from .models import StreamInfo
class DownloaderMusicVideo:
def __init__(
self,
downloader: Downloader,
codec: MusicVideoCodec = MusicVideoCodec.H264_BEST,
):
self.downloader = downloader
self.codec = codec
def get_stream_url_master(self, itunes_page: dict) -> str:
return itunes_page["offers"][0]["assets"][0]["hlsUrl"]
def get_m3u8_master_data(self, stream_url_master: str) -> dict:
url_parts = urllib.parse.urlparse(stream_url_master)
query = urllib.parse.parse_qs(url_parts.query, keep_blank_values=True)
query.update({"aec": "HD", "dsid": "1"})
stream_url_master_new = url_parts._replace(
query=urllib.parse.urlencode(query, doseq=True)
).geturl()
return m3u8.load(stream_url_master_new).data
def get_stream_url_video(
self,
playlists: list[dict],
):
playlists_filtered = [
playlist
for playlist in playlists
if playlist["stream_info"]["codecs"].startswith(
MUSIC_VIDEO_CODEC_MAP[self.codec]
)
]
if not playlists_filtered:
playlists_filtered = [
playlist
for playlist in playlists
if playlist["stream_info"]["codecs"].startswith(
MUSIC_VIDEO_CODEC_MAP[MusicVideoCodec.H264_BEST]
)
]
playlists_filtered.sort(key=lambda x: x["stream_info"]["bandwidth"])
return playlists_filtered[-1]["uri"]
def get_stream_url_video_from_user(
self,
playlists: list[dict],
):
table = [
[
i,
playlist["stream_info"]["codecs"],
playlist["stream_info"]["resolution"],
playlist["stream_info"]["bandwidth"],
]
for i, playlist in enumerate(playlists, 1)
]
print(tabulate(table))
try:
choice = (
click.prompt("Choose a video codec", type=click.IntRange(1, len(table)))
- 1
)
except click.exceptions.Abort:
raise KeyboardInterrupt()
return playlists[choice]["uri"]
def get_stream_url_audio(
self,
playlists: list[dict],
) -> str:
stream_url = next(
(
playlist
for playlist in playlists
if playlist["group_id"] == "audio-stereo-256"
),
None,
)["uri"]
return stream_url
def get_stream_url_audio_from_user(
self,
playlists: list[dict],
):
table = [
[
i,
playlist["group_id"],
]
for i, playlist in enumerate(playlists, 1)
]
print(tabulate(table))
try:
choice = (
click.prompt(
"Choose an audio codec", type=click.IntRange(1, len(table))
)
- 1
)
except click.exceptions.Abort:
raise KeyboardInterrupt()
return playlists[choice]["uri"]
def get_pssh(self, m3u8_data: dict):
return next(
(
key
for key in m3u8_data["keys"]
if key["keyformat"] == "urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed"
),
None,
)["uri"]
def get_stream_info_video(self, m3u8_master_data: dict) -> StreamInfo:
stream_info = StreamInfo()
if self.codec != MusicVideoCodec.ASK:
stream_info.stream_url = self.get_stream_url_video(
m3u8_master_data["playlists"]
)
else:
stream_info.stream_url = self.get_stream_url_video_from_user(
m3u8_master_data["playlists"]
)
m3u8_data = m3u8.load(stream_info.stream_url).data
stream_info.pssh = self.get_pssh(m3u8_data)
return stream_info
def get_stream_info_audio(self, m3u8_master_data: dict) -> StreamInfo:
stream_info = StreamInfo()
if self.codec != MusicVideoCodec.ASK:
stream_info.stream_url = self.get_stream_url_audio(
m3u8_master_data["media"]
)
else:
stream_info.stream_url = self.get_stream_url_audio_from_user(
m3u8_master_data["media"]
)
m3u8_data = m3u8.load(stream_info.stream_url).data
stream_info.pssh = self.get_pssh(m3u8_data)
return stream_info
def get_music_video_id_alt(self, metadata: dict) -> str:
return metadata["attributes"]["url"].split("/")[-1].split("?")[0]
def get_tags(
self,
itunes_page: dict,
m3u8_master_data: dict,
metadata: dict,
):
tags = {
"artist": metadata["attributes"]["artistName"],
"artist_id": int(itunes_page["artistId"]),
"copyright": itunes_page["copyright"],
"date": next(
(
session_data
for session_data in m3u8_master_data["session_data"]
if session_data["data_id"] == "com.apple.hls.release-date"
),
None,
)["value"],
"genre": metadata["attributes"]["genreNames"][0],
"genre_id": int(itunes_page["genres"][0]["genreId"]),
"media_type": 6,
"title": metadata["attributes"]["name"],
"title_id": int(metadata["id"]),
}
if metadata["attributes"].get("contentRating") == "clean":
tags["rating"] = 2
elif metadata["attributes"].get("contentRating") == "explicit":
tags["rating"] = 1
else:
tags["rating"] = 0
if itunes_page.get("collectionId"):
metadata_itunes = self.downloader.itunes_api.get_resource(itunes_page["id"])
album = self.downloader.apple_music_api.get_album(
itunes_page["collectionId"]
)
tags["album"] = album["attributes"]["name"]
tags["album_artist"] = album["attributes"]["artistName"]
tags["album_id"] = int(itunes_page["collectionId"])
tags["disc"] = metadata_itunes[0]["discNumber"]
tags["disc_total"] = metadata_itunes[0]["discCount"]
tags["compilation"] = album["attributes"]["isCompilation"]
tags["track"] = metadata_itunes[0]["trackNumber"]
tags["track_total"] = metadata_itunes[0]["trackCount"]
return tags
def get_encrypted_path_video(self, track_id: str) -> str:
return self.downloader.temp_path / f"encrypted_{track_id}.mp4"
def get_encrypted_path_audio(self, track_id: str) -> str:
return self.downloader.temp_path / f"encrypted_{track_id}.m4a"
def get_decrypted_path_video(self, track_id: str) -> str:
return self.downloader.temp_path / f"decrypted_{track_id}.mp4"
def get_decrypted_path_audio(self, track_id: str) -> str:
return self.downloader.temp_path / f"decrypted_{track_id}.m4a"
def get_remuxed_path(self, track_id: str) -> str:
return self.downloader.temp_path / f"remuxed_{track_id}.m4v"
def decrypt(self, encrypted_path: Path, decryption_key: str, decrypted_path: Path):
subprocess.run(
[
self.downloader.mp4decrypt_path_full,
encrypted_path,
"--key",
f"1:{decryption_key}",
decrypted_path,
],
check=True,
)
def remux_mp4box(
self,
decrypted_path_audio: Path,
decrypted_path_video: Path,
fixed_path: Path,
) -> None:
subprocess.run(
[
self.downloader.mp4box_path_full,
"-quiet",
"-add",
decrypted_path_audio,
"-add",
decrypted_path_video,
"-itags",
"artist=placeholder",
"-new",
fixed_path,
],
check=True,
)
def remux_ffmpeg(
self,
decrypted_path_video: Path,
decrypte_path_audio: Path,
fixed_path: Path,
):
subprocess.run(
[
self.downloader.ffmpeg_path_full,
"-loglevel",
"error",
"-y",
"-i",
decrypted_path_video,
"-i",
decrypte_path_audio,
"-movflags",
"+faststart",
"-f",
"mp4",
"-c",
"copy",
"-c:s",
"mov_text",
fixed_path,
],
check=True,
)
def remux(
self,
decrypted_path_video: Path,
decrypted_path_audio: Path,
remuxed_path: Path,
):
if self.downloader.remux_mode == RemuxMode.MP4BOX:
self.remux_mp4box(
decrypted_path_audio,
decrypted_path_video,
remuxed_path,
)
elif self.downloader.remux_mode == RemuxMode.FFMPEG:
self.remux_ffmpeg(
decrypted_path_video,
decrypted_path_audio,
remuxed_path,
)
def get_cover_path(self, final_path: Path) -> Path:
return final_path.with_suffix(f".{self.downloader.cover_format.value}")

71
gamdl/downloader_post.py Normal file
View File

@ -0,0 +1,71 @@
from pathlib import Path
import click
from .downloader import Downloader
from tabulate import tabulate
from .enums import PostQuality
class DownloaderPost:
QUALITY_RANK = [
"1080pHdVideo",
"720pHdVideo",
"sdVideoWithPlusAudio",
"sdVideo",
"sd480pVideo",
"provisionalUploadVideo",
]
def __init__(
self,
downloader: Downloader,
quality: PostQuality = PostQuality.BEST,
):
self.downloader = downloader
self.quality = quality
def get_stream_url_best(self, metadata: dict) -> str:
best_quality = next(
(
quality
for quality in self.QUALITY_RANK
if metadata["attributes"]["assetTokens"].get(quality)
),
None,
)
return metadata["attributes"]["assetTokens"][best_quality]
def get_stream_url_from_user(self, metadata: dict) -> str:
qualities = list(metadata["attributes"]["assetTokens"].keys())
table = [
[index, quality]
for index, quality in enumerate(
qualities,
start=1,
)
]
print(tabulate(table))
choice = (
click.prompt("Choose a quality", type=click.IntRange(1, len(table))) - 1
)
return metadata["attributes"]["assetTokens"][qualities[choice]]
def get_stream_url(self, metadata: dict) -> str:
if self.quality == PostQuality.BEST:
stream_url = self.get_stream_url_best(metadata)
elif self.quality == PostQuality.ASK:
stream_url = self.get_stream_url_from_user(metadata)
return stream_url
def get_tags(self, metadata: dict) -> list:
attributes = metadata["attributes"]
return {
"artist": attributes["artistName"],
"date": attributes["uploadDate"],
"title": attributes["name"],
"title_id": int(metadata["id"]),
}
def get_temp_path(self, track_id: str) -> Path:
return self.downloader.temp_path / f"{track_id}_temp.m4v"

346
gamdl/downloader_song.py Normal file
View File

@ -0,0 +1,346 @@
import base64
import datetime
import json
import re
import subprocess
from pathlib import Path
from xml.dom import minidom
from xml.etree import ElementTree
import click
import m3u8
from tabulate import tabulate
from .constants import SONG_CODEC_REGEX_MAP, SYNCED_LYRICS_FILE_EXTENSION_MAP
from .downloader import Downloader
from .enums import RemuxMode, SongCodec, SyncedLyricsFormat
from .models import Lyrics, StreamInfo
class DownloaderSong:
DEFAULT_DECRYPTION_KEY = "32b8ade1769e26b1ffb8986352793fc6"
def __init__(
self,
downloader: Downloader,
codec: SongCodec = SongCodec.AAC_LEGACY,
synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC,
):
self.downloader = downloader
self.codec = codec
self.synced_lyrics_format = synced_lyrics_format
def get_drm_infos(self, m3u8_data: dict) -> dict:
drm_info_raw = next(
(
session_data
for session_data in m3u8_data["session_data"]
if session_data["data_id"] == "com.apple.hls.AudioSessionKeyInfo"
),
None,
)
if not drm_info_raw:
raise Exception("DRM info not found")
return json.loads(base64.b64decode(drm_info_raw["value"]).decode("utf-8"))
def get_asset_infos(self, m3u8_data: dict) -> dict:
return json.loads(
base64.b64decode(
next(
session_data
for session_data in m3u8_data["session_data"]
if session_data["data_id"] == "com.apple.hls.audioAssetMetadata"
)["value"]
).decode("utf-8")
)
def get_playlist_from_codec(self, m3u8_data: dict) -> dict | None:
m3u8_master_playlists = [
playlist
for playlist in m3u8_data["playlists"]
if re.fullmatch(
SONG_CODEC_REGEX_MAP[self.codec], playlist["stream_info"]["audio"]
)
]
if not m3u8_master_playlists:
return None
m3u8_master_playlists.sort(key=lambda x: x["stream_info"]["average_bandwidth"])
return m3u8_master_playlists[-1]
def get_playlist_from_user(self, m3u8_data: dict) -> dict | None:
m3u8_master_playlists = [playlist for playlist in m3u8_data["playlists"]]
table = [
[i, playlist["stream_info"]["audio"]]
for i, playlist in enumerate(m3u8_master_playlists, 1)
]
print(tabulate(table))
try:
choice = (
click.prompt("Choose a codec", type=click.IntRange(1, len(table))) - 1
)
except click.exceptions.Abort:
raise KeyboardInterrupt()
return m3u8_master_playlists[choice]
def get_pssh(
self,
drm_infos: dict,
drm_ids: list,
) -> str | None:
drm_info = next(
(
drm_infos[drm_id]
for drm_id in drm_ids
if drm_infos[drm_id].get(
"urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed"
)
and drm_id != "1"
),
None,
)
if not drm_info:
return None
return drm_info["urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed"]["URI"]
def get_stream_info(self, track_metadata: dict) -> StreamInfo:
m3u8_url = track_metadata["attributes"]["extendedAssetUrls"]["enhancedHls"]
return self._get_stream_info(m3u8_url)
def _get_stream_info(self, m3u8_url: str) -> StreamInfo:
stream_info = StreamInfo()
m3u8_obj = m3u8.load(m3u8_url)
m3u8_data = m3u8_obj.data
drm_infos = self.get_drm_infos(m3u8_data)
asset_infos = self.get_asset_infos(m3u8_data)
if self.codec == SongCodec.ASK:
playlist = self.get_playlist_from_user(m3u8_data)
else:
playlist = self.get_playlist_from_codec(m3u8_data)
if playlist is None:
return stream_info
stream_info.stream_url = m3u8_obj.base_uri + playlist["uri"]
variant_id = playlist["stream_info"]["stable_variant_id"]
drm_ids = asset_infos[variant_id]["AUDIO-SESSION-KEY-IDS"]
pssh = self.get_pssh(drm_infos, drm_ids)
stream_info.pssh = pssh
return stream_info
@staticmethod
def parse_datetime_obj_from_timestamp_ttml(
timestamp_ttml: str,
) -> datetime.datetime:
mins_secs_ms = re.findall(r"\d+", timestamp_ttml)
ms, secs, mins = 0, 0, 0
if len(mins_secs_ms) == 2 and ":" in timestamp_ttml:
secs, mins = int(mins_secs_ms[-1]), int(mins_secs_ms[-2])
elif len(mins_secs_ms) == 1:
ms = int(mins_secs_ms[-1])
else:
secs = float(f"{mins_secs_ms[-2]}.{mins_secs_ms[-1]}")
if len(mins_secs_ms) > 2:
mins = int(mins_secs_ms[-3])
return datetime.datetime.fromtimestamp((mins * 60) + secs + (ms / 1000))
def get_lyrics_synced_timestamp_lrc(self, timestamp_ttml: str) -> str:
datetime_obj = self.parse_datetime_obj_from_timestamp_ttml(timestamp_ttml)
ms_new = datetime_obj.strftime("%f")[:-3]
if int(ms_new[-1]) >= 5:
ms = int(f"{int(ms_new[:2]) + 1}") * 10
datetime_obj += datetime.timedelta(milliseconds=ms) - datetime.timedelta(
microseconds=datetime_obj.microsecond
)
return datetime_obj.strftime("%M:%S.%f")[:-4]
def get_lyrics_synced_timestamp_srt(self, timestamp_ttml: str) -> str:
datetime_obj = self.parse_datetime_obj_from_timestamp_ttml(timestamp_ttml)
return datetime_obj.strftime("00:%M:%S,%f")[:-3]
def get_lyrics_synced_line_lrc(self, timestamp_ttml: str, text: str) -> str:
return f"[{self.get_lyrics_synced_timestamp_lrc(timestamp_ttml)}]{text}"
def get_lyrics_synced_line_srt(
self,
index: int,
timestamp_ttml_start: str,
timestamp_ttml_end: str,
text: str,
) -> str:
timestamp_srt_start = self.get_lyrics_synced_timestamp_srt(timestamp_ttml_start)
timestamp_srt_end = self.get_lyrics_synced_timestamp_srt(timestamp_ttml_end)
return f"{index}\n{timestamp_srt_start} --> {timestamp_srt_end}\n{text}\n"
def get_lyrics(self, track_metadata: dict) -> Lyrics:
if not track_metadata["attributes"]["hasLyrics"]:
return Lyrics()
elif track_metadata.get("relationships") is None:
track_metadata = self.downloader.apple_music_api.get_song(
track_metadata["id"]
)
if track_metadata["relationships"]["lyrics"]["data"]:
return self._get_lyrics(
track_metadata["relationships"]["lyrics"]["data"][0]["attributes"][
"ttml"
]
)
else:
return Lyrics()
def _get_lyrics(self, lyrics_ttml: str) -> Lyrics:
lyrics = Lyrics("", "")
lyrics_ttml_et = ElementTree.fromstring(lyrics_ttml)
index = 1
for div in lyrics_ttml_et.iter("{http://www.w3.org/ns/ttml}div"):
for p in div.iter("{http://www.w3.org/ns/ttml}p"):
if p.text is not None:
lyrics.unsynced += p.text + "\n"
if p.attrib.get("begin"):
if self.synced_lyrics_format == SyncedLyricsFormat.LRC:
lyrics.synced += f"{self.get_lyrics_synced_line_lrc(p.attrib.get('begin'), p.text)}\n"
elif self.synced_lyrics_format == SyncedLyricsFormat.SRT:
lyrics.synced += f"{self.get_lyrics_synced_line_srt(index, p.attrib.get('begin'), p.attrib.get('end'), p.text)}\n"
elif self.synced_lyrics_format == SyncedLyricsFormat.TTML:
if not lyrics.synced:
lyrics.synced = minidom.parseString(
lyrics_ttml
).toprettyxml()
continue
lyrics.synced += "\n"
index += 1
lyrics.unsynced += "\n"
lyrics.unsynced = lyrics.unsynced[:-2]
return lyrics
def get_tags(self, webplayback: dict, lyrics_unsynced: str) -> dict:
tags_raw = webplayback["assets"][0]["metadata"]
tags = {
"album": tags_raw["playlistName"],
"album_artist": tags_raw["playlistArtistName"],
"album_id": int(tags_raw["playlistId"]),
"album_sort": tags_raw["sort-album"],
"artist": tags_raw["artistName"],
"artist_id": int(tags_raw["artistId"]),
"artist_sort": tags_raw["sort-artist"],
"comments": tags_raw.get("comments"),
"compilation": tags_raw["compilation"],
"composer": tags_raw.get("composerName"),
"composer_id": (
int(tags_raw.get("composerId")) if tags_raw.get("composerId") else None
),
"composer_sort": tags_raw.get("sort-composer"),
"copyright": tags_raw.get("copyright"),
"date": (
self.downloader.sanitize_date(tags_raw["releaseDate"])
if tags_raw.get("releaseDate")
else None
),
"disc": tags_raw["discNumber"],
"disc_total": tags_raw["discCount"],
"gapless": tags_raw["gapless"],
"genre": tags_raw["genre"],
"genre_id": tags_raw["genreId"],
"lyrics": lyrics_unsynced if lyrics_unsynced else None,
"media_type": 1,
"rating": tags_raw["explicit"],
"storefront": tags_raw["s"],
"title": tags_raw["itemName"],
"title_id": int(tags_raw["itemId"]),
"title_sort": tags_raw["sort-name"],
"track": tags_raw["trackNumber"],
"track_total": tags_raw["trackCount"],
"xid": tags_raw.get("xid"),
}
return tags
def get_encrypted_path(self, track_id: str) -> Path:
return self.downloader.temp_path / f"{track_id}_encrypted.m4a"
def get_decrypted_path(self, track_id: str) -> Path:
return self.downloader.temp_path / f"{track_id}_decrypted.m4a"
def get_remuxed_path(self, track_id: str) -> Path:
return self.downloader.temp_path / f"{track_id}_remuxed.m4a"
def fix_key_id(self, encrypted_path: Path):
count = 0
with open(encrypted_path, "rb+") as file:
while data := file.read(4096):
pos = file.tell()
i = 0
while tenc := max(0, data.find(b"tenc", i)):
kid = tenc + 12
file.seek(max(0, pos - 4096) + kid, 0)
file.write(bytes.fromhex(f"{count:032}"))
count += 1
i = kid + 1
file.seek(pos, 0)
def decrypt(
self,
encrypted_path: Path,
decrypted_path: Path,
decryption_key: str,
):
self.fix_key_id(encrypted_path)
subprocess.run(
[
self.downloader.mp4decrypt_path_full,
encrypted_path,
"--key",
f"00000000000000000000000000000001:{decryption_key}",
"--key",
f"00000000000000000000000000000000:{self.DEFAULT_DECRYPTION_KEY}",
decrypted_path,
],
check=True,
)
def remux(self, decrypted_path: Path, remuxed_path: Path) -> None:
if self.downloader.remux_mode == RemuxMode.MP4BOX:
self.remux_mp4box(decrypted_path, remuxed_path)
elif self.downloader.remux_mode == RemuxMode.FFMPEG:
self.remux_ffmpeg(decrypted_path, remuxed_path)
def remux_mp4box(self, decrypted_path: Path, remuxed_path: Path) -> None:
subprocess.run(
[
self.downloader.mp4box_path_full,
"-quiet",
"-add",
decrypted_path,
"-itags",
"artist=placeholder",
"-new",
remuxed_path,
],
check=True,
)
def remux_ffmpeg(self, decrypted_path: Path, remuxed_path: Path) -> None:
subprocess.run(
[
self.downloader.ffmpeg_path_full,
"-loglevel",
"error",
"-y",
"-i",
decrypted_path,
"-c",
"copy",
"-movflags",
"+faststart",
remuxed_path,
],
check=True,
)
def get_lyrics_synced_path(self, final_path: Path) -> Path:
return final_path.with_suffix(
SYNCED_LYRICS_FILE_EXTENSION_MAP[self.synced_lyrics_format]
)
def get_cover_path(self, final_path: Path) -> Path:
return final_path.parent / f"Cover.{self.downloader.cover_format.value}"
def save_lyrics_synced(self, lyrics_synced_path: Path, lyrics_synced: str):
lyrics_synced_path.parent.mkdir(parents=True, exist_ok=True)
lyrics_synced_path.write_text(lyrics_synced, encoding="utf8")

View File

@ -0,0 +1,118 @@
import base64
import subprocess
from pathlib import Path
import m3u8
from pywidevine import PSSH
from pywidevine.license_protocol_pb2 import WidevinePsshData
from .downloader_song import DownloaderSong
from .enums import RemuxMode, SongCodec
from .models import StreamInfo
class DownloaderSongLegacy(DownloaderSong):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def get_stream_info(self, webplayback: dict) -> StreamInfo:
flavor = "32:ctrp64" if self.codec == SongCodec.AAC_HE_LEGACY else "28:ctrp256"
stream_info = StreamInfo()
stream_info.stream_url = next(
i for i in webplayback["assets"] if i["flavor"] == flavor
)["URL"]
m3u8_obj = m3u8.load(stream_info.stream_url)
stream_info.pssh = m3u8_obj.keys[0].uri
return stream_info
def get_decryption_key(self, pssh: str, track_id: str) -> str:
widevine_pssh_data = WidevinePsshData()
widevine_pssh_data.algorithm = 1
widevine_pssh_data.key_ids.append(base64.b64decode(pssh.split(",")[1]))
pssh_obj = PSSH(widevine_pssh_data.SerializeToString())
cdm_session = self.downloader.cdm.open()
challenge = base64.b64encode(
self.downloader.cdm.get_license_challenge(cdm_session, pssh_obj)
).decode()
license = self.downloader.apple_music_api.get_widevine_license(
track_id,
pssh,
challenge,
)
self.downloader.cdm.parse_license(cdm_session, license)
decryption_key = next(
i for i in self.downloader.cdm.get_keys(cdm_session) if i.type == "CONTENT"
).key.hex()
self.downloader.cdm.close(cdm_session)
return decryption_key
def decrypt(
self,
encrypted_path: Path,
decrypted_path: Path,
decryption_key: str,
):
self.fix_key_id(encrypted_path)
subprocess.run(
[
self.downloader.mp4decrypt_path_full,
encrypted_path,
"--key",
f"1:{decryption_key}",
decrypted_path,
],
check=True,
)
def remux_mp4box(self, decrypted_path: Path, remuxed_path: Path) -> None:
subprocess.run(
[
self.downloader.mp4box_path_full,
"-quiet",
"-add",
decrypted_path,
"-itags",
"artist=placeholder",
"-new",
remuxed_path,
],
check=True,
)
def remux_ffmpeg(
self,
decryption_key: str,
encrypted_path: Path,
remuxed_path: Path,
):
subprocess.run(
[
self.downloader.ffmpeg_path_full,
"-loglevel",
"error",
"-y",
"-decryption_key",
decryption_key,
"-i",
encrypted_path,
"-c",
"copy",
"-movflags",
"+faststart",
remuxed_path,
],
check=True,
)
def remux(
self,
encrypted_path: Path,
decrypted_path: Path,
remuxed_path: Path,
decryption_key: str,
):
if self.downloader.remux_mode == RemuxMode.FFMPEG:
self.remux_ffmpeg(decryption_key, encrypted_path, remuxed_path)
elif self.downloader.remux_mode == RemuxMode.MP4BOX:
self.decrypt(encrypted_path, decrypted_path, decryption_key)
self.remux_mp4box(decrypted_path, remuxed_path)

47
gamdl/enums.py Normal file
View File

@ -0,0 +1,47 @@
from enum import Enum
class DownloadMode(Enum):
YTDLP = "ytdlp"
NM3U8DLRE = "nm3u8dlre"
class RemuxMode(Enum):
FFMPEG = "ffmpeg"
MP4BOX = "mp4box"
class SongCodec(Enum):
AAC_LEGACY = "aac-legacy"
AAC_HE_LEGACY = "aac-he-legacy"
AAC = "aac"
AAC_HE = "aac-he"
AAC_BINAURAL = "aac-binaural"
AAC_DOWNMIX = "aac-downmix"
AAC_HE_BINAURAL = "aac-he-binaural"
AAC_HE_DOWNMIX = "aac-he-downmix"
ALAC = "alac"
ATMOS = "atmos"
ASK = "ask"
class SyncedLyricsFormat(Enum):
LRC = "lrc"
SRT = "srt"
TTML = "ttml"
class MusicVideoCodec(Enum):
H264_BEST = "h264-best"
H265_BEST = "h265-best"
ASK = "ask"
class PostQuality(Enum):
BEST = "best"
ASK = "ask"
class CoverFormat(Enum):
JPG = "jpg"
PNG = "png"

1
gamdl/hardcoded_wvd.py Normal file
View File

@ -0,0 +1 @@
HARDCODED_WVD = """V1ZEAgIDAASoMIIEpAIBAAKCAQEAwnCFAPXy4U1J7p1NohAS+xl040f5FBaE/59bPp301bGz0UGFT9VoEtY3vaeakKh/d319xTNvCSWsEDRaMmp/wSnMiEZUkkl04872jx2uHuR4k6KYuuJoqhsIo1TwUBueFZynHBUJzXQeW8Eb1tYAROGwp8W7r+b0RIjHC89RFnfVXpYlF5I6McktyzJNSOwlQbMqlVihfSUkv3WRd3HFmA0Oxay51CEIkoTlNTHVlzVyhov5eHCDSp7QENRgaaQ03jC/CcgFOoQymhsBtRCM0CQmfuAHjA9e77R6m/GJPy75G9fqoZM1RMzVDHKbKZPd3sFd0c0+77gLzW8cWEaaHwIDAQABAoIBAQCB2pN46MikHvHZIcTPDt0eRQoDH/YArGl2Lf7J+sOgU2U7wv49KtCug9IGHwDiyyUVsAFmycrF2RroV45FTUq0vi2SdSXV7Kjb20Ren/vBNeQw9M37QWmU8Sj7q6YyWb9hv5T69DHvvDTqIjVtbM4RMojAAxYti5hmjNIh2PrWfVYWhXxCQ/WqAjWLtZBM6Oww1byfr5I/wFogAKkgHi8wYXZ4LnIC8V7jLAhujlToOvMMC9qwcBiPKDP2FO+CPSXaqVhH+LPSEgLggnU3EirihgxovbLNAuDEeEbRTyR70B0lW19tLHixso4ZQa7KxlVUwOmrHSZf7nVuWqPpxd+BAoGBAPQLyJ1IeRavmaU8XXxfMdYDoc8+xB7v2WaxkGXb6ToX1IWPkbMz4yyVGdB5PciIP3rLZ6s1+ruuRRV0IZ98i1OuN5TSR56ShCGg3zkd5C4L/xSMAz+NDfYSDBdO8BVvBsw21KqSRUi1ctL7QiIvfedrtGb5XrE4zhH0gjXlU5qZAoGBAMv2segn0Jx6az4rqRa2Y7zRx4iZ77JUqYDBI8WMnFeR54uiioTQ+rOs3zK2fGIWlrn4ohco/STHQSUTB8oCOFLMx1BkOqiR+UyebO28DJY7+V9ZmxB2Guyi7W8VScJcIdpSOPyJFOWZQKXdQFW3YICD2/toUx/pDAJh1sEVQsV3AoGBANyyp1rthmvoo5cVbymhYQ08vaERDwU3PLCtFXu4E0Ow90VNn6Ki4ueXcv/gFOp7pISk2/yuVTBTGjCblCiJ1en4HFWekJwrvgg3Vodtq8Okn6pyMCHRqvWEPqD5hw6rGEensk0K+FMXnF6GULlfn4mgEkYpb+PvDhSYvQSGfkPJAoGAF/bAKFqlM/1eJEvU7go35bNwEiij9Pvlfm8y2L8Qj2lhHxLV240CJ6IkBz1Rl+S3iNohkT8LnwqaKNT3kVB5daEBufxMuAmOlOX4PmZdxDj/r6hDg8ecmjj6VJbXt7JDd/c5ItKoVeGPqu035dpJyE+1xPAY9CLZel4scTsiQTkCgYBt3buRcZMwnc4qqpOOQcXK+DWD6QvpkcJ55ygHYw97iP/lF4euwdHd+I5b+11pJBAao7G0fHX3eSjqOmzReSKboSe5L8ZLB2cAI8AsKTBfKHWmCa8kDtgQuI86fUfirCGdhdA9AVP2QXN2eNCuPnFWi0WHm4fYuUB5be2c18ucxAb9CAESmgsK3QMIAhIQ071yBlsbLoO2CSB9Ds0cmRif6uevBiKOAjCCAQoCggEBAMJwhQD18uFNSe6dTaIQEvsZdONH+RQWhP+fWz6d9NWxs9FBhU/VaBLWN72nmpCof3d9fcUzbwklrBA0WjJqf8EpzIhGVJJJdOPO9o8drh7keJOimLriaKobCKNU8FAbnhWcpxwVCc10HlvBG9bWAEThsKfFu6/m9ESIxwvPURZ31V6WJReSOjHJLcsyTUjsJUGzKpVYoX0lJL91kXdxxZgNDsWsudQhCJKE5TUx1Zc1coaL+Xhwg0qe0BDUYGmkNN4wvwnIBTqEMpobAbUQjNAkJn7gB4wPXu+0epvxiT8u+RvX6qGTNUTM1QxymymT3d7BXdHNPu+4C81vHFhGmh8CAwEAASjwIkgBUqoBCAEQABqBAQQlRbfiBNDb6eU6aKrsH5WJaYszTioXjPLrWN9dqyW0vwfT11kgF0BbCGkAXew2tLJJqIuD95cjJvyGUSN6VyhL6dp44fWEGDSBIPR0mvRq7bMP+m7Y/RLKf83+OyVJu/BpxivQGC5YDL9f1/A8eLhTDNKXs4Ia5DrmTWdPTPBL8SIgyfUtg3ofI+/I9Tf7it7xXpT0AbQBJfNkcNXGpO3JcBMSgAIL5xsXK5of1mMwAl6ygN1Gsj4aZ052otnwN7kXk12SMsXheWTZ/PYh2KRzmt9RPS1T8hyFx/Kp5VkBV2vTAqqWrGw/dh4URqiHATZJUlhO7PN5m2Kq1LVFdXjWSzP5XBF2S83UMe+YruNHpE5GQrSyZcBqHO0QrdPcU35GBT7S7+IJr2AAXvnjqnb8yrtpPWN2ZW/IWUJN2z4vZ7/HV4aj3OZhkxC1DIMNyvsusUKoQQuf8gwKiEe8cFwbwFSicywlFk9la2IPe8oFShcxAzHLCCn/TIYUAvEL3/4LgaZvqWm80qCPYbgIP5HT8hPYkKWJ4WYknEWK+3InbnkzteFfGrQFCq4CCAESEGnj6Ji7LD+4o7MoHYT4jBQYjtW+kQUijgIwggEKAoIBAQDY9um1ifBRIOmkPtDZTqH+CZUBbb0eK0Cn3NHFf8MFUDzPEz+emK/OTub/hNxCJCao//pP5L8tRNUPFDrrvCBMo7Rn+iUb+mA/2yXiJ6ivqcN9Cu9i5qOU1ygon9SWZRsujFFB8nxVreY5Lzeq0283zn1Cg1stcX4tOHT7utPzFG/ReDFQt0O/GLlzVwB0d1sn3SKMO4XLjhZdncrtF9jljpg7xjMIlnWJUqxDo7TQkTytJmUl0kcM7bndBLerAdJFGaXc6oSY4eNy/IGDluLCQR3KZEQsy/mLeV1ggQ44MFr7XOM+rd+4/314q/deQbjHqjWFuVr8iIaKbq+R63ShAgMBAAEo8CISgAMii2Mw6z+Qs1bvvxGStie9tpcgoO2uAt5Zvv0CDXvrFlwnSbo+qR71Ru2IlZWVSbN5XYSIDwcwBzHjY8rNr3fgsXtSJty425djNQtF5+J2jrAhf3Q2m7EI5aohZGpD2E0cr+dVj9o8x0uJR2NWR8FVoVQSXZpad3M/4QzBLNto/tz+UKyZwa7Sc/eTQc2+ZcDS3ZEO3lGRsH864Kf/cEGvJRBBqcpJXKfG+ItqEW1AAPptjuggzmZEzRq5xTGf6or+bXrKjCpBS9G1SOyvCNF1k5z6lG8KsXhgQxL6ADHMoulxvUIihyPY5MpimdXfUdEQ5HA2EqNiNVNIO4qP007jW51yAeThOry4J22xs8RdkIClOGAauLIl0lLA4flMzW+VfQl5xYxP0E5tuhn0h+844DslU8ZF7U1dU2QprIApffXD9wgAACk26Rggy8e96z8i86/+YYyZQkc9hIdCAERrgEYCEbByzONrdRDs1MrS/ch1moV5pJv63BIKvQHGvLkaFwoMY29tcGFueV9uYW1lEgd1bmtub3duGioKCm1vZGVsX25hbWUSHEFuZHJvaWQgU0RLIGJ1aWx0IGZvciB4ODZfNjQaGwoRYXJjaGl0ZWN0dXJlX25hbWUSBng4Nl82NBodCgtkZXZpY2VfbmFtZRIOZ2VuZXJpY194ODZfNjQaIAoMcHJvZHVjdF9uYW1lEhBzZGtfcGhvbmVfeDg2XzY0GmMKCmJ1aWxkX2luZm8SVUFuZHJvaWQvc2RrX3Bob25lX3g4Nl82NC9nZW5lcmljX3g4Nl82NDo5L1BTUjEuMTgwNzIwLjAxMi80OTIzMjE0OnVzZXJkZWJ1Zy90ZXN0LWtleXMaHgoUd2lkZXZpbmVfY2RtX3ZlcnNpb24SBjE0LjAuMBokCh9vZW1fY3J5cHRvX3NlY3VyaXR5X3BhdGNoX2xldmVsEgEwMg4QASAAKA0wAEAASABQAA=="""

85
gamdl/itunes_api.py Normal file
View File

@ -0,0 +1,85 @@
from __future__ import annotations
import functools
import requests
from .apple_music_api import AppleMusicApi
from .constants import STOREFRONT_IDS
class ItunesApi:
ITUNES_LOOKUP_API_URL = "https://itunes.apple.com/lookup"
ITUNES_PAGE_API_URL = "https://music.apple.com"
def __init__(
self,
storefront: str = "us",
language: str = "en-US",
):
self.storefront = storefront
self.language = language
self._setup_session()
def _setup_session(self):
try:
self.storefront_id = STOREFRONT_IDS[self.storefront.upper()]
except KeyError:
raise Exception(f"No storefront id for {self.storefront}")
self.session = requests.Session()
self.session.params = {
"country": self.storefront,
"lang": self.language,
}
self.session.headers = {
"X-Apple-Store-Front": f"{self.storefront_id} t:music31",
}
@functools.lru_cache()
def get_resource(
self,
resource_id: str,
entity: str = "album",
) -> dict:
response = self.session.get(
self.ITUNES_LOOKUP_API_URL,
params={
"id": resource_id,
"entity": entity,
},
)
try:
response.raise_for_status()
response_dict = response.json()
resource = response_dict.get("results")
assert resource
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
AppleMusicApi._raise_response_exception(response)
return resource
def get_itunes_page(
self,
resource_type: str,
resource_id: str,
) -> dict:
response = self.session.get(
f"{self.ITUNES_PAGE_API_URL}/{resource_type}/{resource_id}"
)
try:
response.raise_for_status()
response_dict = response.json()
itunes_page = response_dict["storePlatformData"]["product-dv"][
"results"
].get(resource_id)
assert itunes_page
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
AppleMusicApi._raise_response_exception(response)
return itunes_page

25
gamdl/models.py Normal file
View File

@ -0,0 +1,25 @@
from dataclasses import dataclass
@dataclass
class UrlInfo:
storefront: str = None
type: str = None
id: str = None
@dataclass
class DownloadQueueItem:
metadata: dict = None
@dataclass
class Lyrics:
synced: str = None
unsynced: str = None
@dataclass
class StreamInfo:
stream_url: str = None
pssh: str = None

View File

@ -1,9 +1,17 @@
[project]
name = "gamdl"
description = "Download Apple Music songs/music videos/albums/playlists"
requires-python = ">=3.7"
requires-python = ">=3.8"
authors = [{ name = "glomatico" }]
dependencies = ["click", "m3u8", "pywidevine", "pyyaml", "yt-dlp", "ciso8601"]
dependencies = [
"ciso8601",
"click",
"m3u8",
"tabulate",
"pywidevine",
"pyyaml",
"yt-dlp",
]
readme = "README.md"
dynamic = ["version"]

View File

@ -1,6 +1,7 @@
ciso8601
click
m3u8
tabulate
pywidevine
pyyaml
yt-dlp
ciso8601