#!/bin/python3 import argparse import os import re import subprocess import sys import threading from concurrent.futures import ThreadPoolExecutor, as_completed from mutagen.easyid3 import EasyID3 from mutagen.mp3 import MP3 from ytmusicapi import YTMusic # Initialize the YTMusic API ytmusic = YTMusic() # Constants for maximum threads and print lock MAX_THREADS = 4 print_lock = threading.Lock() # Function to sanitize file names def sanitize_filename(name): return re.sub(r'[\\/*?:"<>|]', "_", name) # Function to read song names from a custom input file def read_song_names(filename="$HOME/.config/pamus/list.txt"): filename = filename.replace("$HOME", os.environ["HOME"]) # Replace $HOME with the full path with open(filename, "r") as f: return [ line.split("#", 1)[0].strip() for line in f if line.strip() and not line.strip().startswith("#") ] # Function to download audio from YouTube using yt-dlp def download_audio(video_id, sanitized_title, output_dir, audio_format, verbose=False): os.makedirs(output_dir, exist_ok=True) url = f"https://www.youtube.com/watch?v={video_id}" output_template = os.path.join(output_dir, f"{sanitized_title}.%(ext)s") command = [ "yt-dlp", "-x", "--audio-format", audio_format, # Use the specified audio format "--quiet" if not verbose else "", "--no-warnings", "--output", output_template, url ] subprocess.run(command, check=True) # Function to tag the audio file with metadata def tag_audio(file_path, metadata): audio = MP3(file_path, ID3=EasyID3) audio["title"] = metadata.get("title", "") audio["artist"] = ", ".join(metadata.get("artists", [])) audio["album"] = metadata.get("album", "") audio["genre"] = metadata.get("genre", "") audio.save() # Function to process each song def process_song(song_query, output_dir, audio_format, verbose): try: song = ytmusic.search(song_query, filter="songs")[0] title = song["title"] sanitized_title = sanitize_filename(title) audio_path = os.path.join(output_dir, f"{sanitized_title}.{audio_format}") if os.path.exists(audio_path): with print_lock: print(f"Already downloaded: {title}") return with print_lock: print(f"Downloading: {title}") download_audio(song["videoId"], sanitized_title, output_dir, audio_format, verbose) metadata = { "title": title, "artists": [a["name"] for a in song["artists"]], "album": song.get("album", {}).get("name", ""), "genre": song.get("resultType", "") } if os.path.exists(audio_path): tag_audio(audio_path, metadata) with print_lock: print(f"Done: {title}") else: with print_lock: print(f"Download failed: {title}", file=sys.stderr) except Exception as e: with print_lock: print(f"Error processing '{song_query}': {e}", file=sys.stderr) # Main function to handle the execution def main(): # Parse command-line arguments for output directory, verbosity, input file, and audio format parser = argparse.ArgumentParser(description="Download and tag songs from YouTube Music") parser.add_argument( "-o", "--output-dir", type=str, default="$HOME/music", # Default download path help="Directory to save the downloaded audio files" ) parser.add_argument( "-v", "--verbose", action="store_true", help="Enable verbose output" ) parser.add_argument( "-i", "--input-file", type=str, default="$HOME/.config/pamus/list.txt", # Default input file path help="Input file containing song names to download" ) parser.add_argument( "-f", "--audio-format", type=str, choices=["mp3", "aac", "flac", "opus", "wav"], default="mp3", help="Audio format to download (default: mp3)" ) args = parser.parse_args() output_dir = args.output_dir verbose = args.verbose input_file = args.input_file audio_format = args.audio_format # Replace $HOME with the actual home directory path for input file and output directory output_dir = output_dir.replace("$HOME", os.environ["HOME"]) input_file = input_file.replace("$HOME", os.environ["HOME"]) song_names = read_song_names(input_file) with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: futures = [executor.submit(process_song, name, output_dir, audio_format, verbose) for name in song_names] for _ in as_completed(futures): pass # handled in process_song() if __name__ == "__main__": main()