"""音乐下载器模块 提供网易云音乐下载功能,包括: - 音乐信息获取 - 文件下载到本地 - 内存下载 - 音乐标签写入 - 异步下载支持 """ import os import re import asyncio import aiohttp import aiofiles from io import BytesIO from typing import Dict, List, Optional, Tuple, Any, Union from pathlib import Path from dataclasses import dataclass from enum import Enum import requests from mutagen.flac import FLAC from mutagen.mp3 import MP3 from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TRCK, APIC from mutagen.mp4 import MP4 from music_api import NeteaseAPI, APIException from cookie_manager import CookieManager class AudioFormat(Enum): """音频格式枚举""" MP3 = "mp3" FLAC = "flac" M4A = "m4a" UNKNOWN = "unknown" class QualityLevel(Enum): """音质等级枚举""" STANDARD = "standard" # 标准 EXHIGH = "exhigh" # 极高 LOSSLESS = "lossless" # 无损 HIRES = "hires" # Hi-Res SKY = "sky" # 沉浸环绕声 JYEFFECT = "jyeffect" # 高清环绕声 JYMASTER = "jymaster" # 超清母带 @dataclass class MusicInfo: """音乐信息数据类""" id: int name: str artists: str album: str pic_url: str duration: int track_number: int download_url: str file_type: str file_size: int quality: str lyric: str = "" tlyric: str = "" @dataclass class DownloadResult: """下载结果数据类""" success: bool file_path: Optional[str] = None file_size: int = 0 error_message: str = "" music_info: Optional[MusicInfo] = None class DownloadException(Exception): """下载异常类""" pass class MusicDownloader: """音乐下载器主类""" def __init__(self, download_dir: str = "downloads", max_concurrent: int = 3): """ 初始化音乐下载器 Args: download_dir: 下载目录 max_concurrent: 最大并发下载数 """ self.download_dir = Path(download_dir) self.download_dir.mkdir(exist_ok=True) self.max_concurrent = max_concurrent # 初始化依赖 self.cookie_manager = CookieManager() self.api = NeteaseAPI() # 支持的文件格式 self.supported_formats = { 'mp3': AudioFormat.MP3, 'flac': AudioFormat.FLAC, 'm4a': AudioFormat.M4A } def _sanitize_filename(self, filename: str) -> str: """清理文件名,移除非法字符 Args: filename: 原始文件名 Returns: 清理后的安全文件名 """ # 移除或替换非法字符 illegal_chars = r'[<>:"/\\|?*]' filename = re.sub(illegal_chars, '_', filename) # 移除前后空格和点 filename = filename.strip(' .') # 限制长度 if len(filename) > 200: filename = filename[:200] return filename or "unknown" def _determine_file_extension(self, url: str, content_type: str = "") -> str: """根据URL和Content-Type确定文件扩展名 Args: url: 下载URL content_type: HTTP Content-Type头 Returns: 文件扩展名 """ # 首先尝试从URL获取 if '.flac' in url.lower(): return '.flac' elif '.mp3' in url.lower(): return '.mp3' elif '.m4a' in url.lower(): return '.m4a' # 从Content-Type获取 content_type = content_type.lower() if 'flac' in content_type: return '.flac' elif 'mpeg' in content_type or 'mp3' in content_type: return '.mp3' elif 'mp4' in content_type or 'm4a' in content_type: return '.m4a' return '.mp3' # 默认 def get_music_info(self, music_id: int, quality: str = "standard") -> MusicInfo: """获取音乐详细信息 Args: music_id: 音乐ID quality: 音质等级 Returns: 音乐信息对象 Raises: DownloadException: 获取信息失败时抛出 """ try: # 获取cookies cookies = self.cookie_manager.parse_cookies() # 获取音乐URL信息 url_result = self.api.get_song_url(music_id, quality, cookies) if not url_result.get('data') or not url_result['data']: raise DownloadException(f"无法获取音乐ID {music_id} 的播放链接") song_data = url_result['data'][0] download_url = song_data.get('url', '') if not download_url: raise DownloadException(f"音乐ID {music_id} 无可用的下载链接") # 获取音乐详情 detail_result = self.api.get_song_detail(music_id) if not detail_result.get('songs') or not detail_result['songs']: raise DownloadException(f"无法获取音乐ID {music_id} 的详细信息") song_detail = detail_result['songs'][0] # 获取歌词 lyric_result = self.api.get_lyric(music_id, cookies) lyric = lyric_result.get('lrc', {}).get('lyric', '') if lyric_result else '' tlyric = lyric_result.get('tlyric', {}).get('lyric', '') if lyric_result else '' # 构建艺术家字符串 artists = '/'.join(artist['name'] for artist in song_detail.get('ar', [])) # 创建MusicInfo对象 music_info = MusicInfo( id=music_id, name=song_detail.get('name', '未知歌曲'), artists=artists or '未知艺术家', album=song_detail.get('al', {}).get('name', '未知专辑'), pic_url=song_detail.get('al', {}).get('picUrl', ''), duration=song_detail.get('dt', 0) // 1000, # 转换为秒 track_number=song_detail.get('no', 0), download_url=download_url, file_type=song_data.get('type', 'mp3').lower(), file_size=song_data.get('size', 0), quality=quality, lyric=lyric, tlyric=tlyric ) return music_info except APIException as e: raise DownloadException(f"API调用失败: {e}") except Exception as e: raise DownloadException(f"获取音乐信息时发生错误: {e}") def download_music_file(self, music_id: int, quality: str = "standard") -> DownloadResult: """下载音乐文件到本地 Args: music_id: 音乐ID quality: 音质等级 Returns: 下载结果对象 """ try: # 获取音乐信息 music_info = self.get_music_info(music_id, quality) # 生成文件名 filename = f"{music_info.artists} - {music_info.name}" safe_filename = self._sanitize_filename(filename) # 确定文件扩展名 file_ext = self._determine_file_extension(music_info.download_url) file_path = self.download_dir / f"{safe_filename}{file_ext}" # 检查文件是否已存在 if file_path.exists(): return DownloadResult( success=True, file_path=str(file_path), file_size=file_path.stat().st_size, music_info=music_info ) # 下载文件 response = requests.get(music_info.download_url, stream=True, timeout=30) response.raise_for_status() # 写入文件 with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) # 写入音乐标签 self._write_music_tags(file_path, music_info) return DownloadResult( success=True, file_path=str(file_path), file_size=file_path.stat().st_size, music_info=music_info ) except DownloadException: raise except requests.RequestException as e: return DownloadResult( success=False, error_message=f"下载请求失败: {e}" ) except Exception as e: return DownloadResult( success=False, error_message=f"下载过程中发生错误: {e}" ) async def download_music_file_async(self, music_id: int, quality: str = "standard") -> DownloadResult: """异步下载音乐文件到本地 Args: music_id: 音乐ID quality: 音质等级 Returns: 下载结果对象 """ try: # 获取音乐信息(同步操作) music_info = self.get_music_info(music_id, quality) # 生成文件名 filename = f"{music_info.artists} - {music_info.name}" safe_filename = self._sanitize_filename(filename) # 确定文件扩展名 file_ext = self._determine_file_extension(music_info.download_url) file_path = self.download_dir / f"{safe_filename}{file_ext}" # 检查文件是否已存在 if file_path.exists(): return DownloadResult( success=True, file_path=str(file_path), file_size=file_path.stat().st_size, music_info=music_info ) # 异步下载文件 async with aiohttp.ClientSession() as session: async with session.get(music_info.download_url) as response: response.raise_for_status() async with aiofiles.open(file_path, 'wb') as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) # 写入音乐标签 self._write_music_tags(file_path, music_info) return DownloadResult( success=True, file_path=str(file_path), file_size=file_path.stat().st_size, music_info=music_info ) except DownloadException: raise except aiohttp.ClientError as e: return DownloadResult( success=False, error_message=f"异步下载请求失败: {e}" ) except Exception as e: return DownloadResult( success=False, error_message=f"异步下载过程中发生错误: {e}" ) def download_music_to_memory(self, music_id: int, quality: str = "standard") -> Tuple[bool, BytesIO, MusicInfo]: """下载音乐到内存 Args: music_id: 音乐ID quality: 音质等级 Returns: (是否成功, 音乐数据流, 音乐信息) Raises: DownloadException: 下载失败时抛出 """ try: # 获取音乐信息 music_info = self.get_music_info(music_id, quality) # 下载到内存 response = requests.get(music_info.download_url, timeout=30) response.raise_for_status() # 创建BytesIO对象 audio_data = BytesIO(response.content) return True, audio_data, music_info except DownloadException: raise except requests.RequestException as e: raise DownloadException(f"下载到内存失败: {e}") except Exception as e: raise DownloadException(f"内存下载过程中发生错误: {e}") async def download_batch_async(self, music_ids: List[int], quality: str = "standard") -> List[DownloadResult]: """批量异步下载音乐 Args: music_ids: 音乐ID列表 quality: 音质等级 Returns: 下载结果列表 """ semaphore = asyncio.Semaphore(self.max_concurrent) async def download_with_semaphore(music_id: int) -> DownloadResult: async with semaphore: return await self.download_music_file_async(music_id, quality) tasks = [download_with_semaphore(music_id) for music_id in music_ids] results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常结果 processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append(DownloadResult( success=False, error_message=f"下载音乐ID {music_ids[i]} 时发生异常: {result}" )) else: processed_results.append(result) return processed_results def _write_music_tags(self, file_path: Path, music_info: MusicInfo) -> None: """写入音乐标签信息 Args: file_path: 音乐文件路径 music_info: 音乐信息 """ try: file_ext = file_path.suffix.lower() if file_ext == '.mp3': self._write_mp3_tags(file_path, music_info) elif file_ext == '.flac': self._write_flac_tags(file_path, music_info) elif file_ext == '.m4a': self._write_m4a_tags(file_path, music_info) except Exception as e: print(f"写入音乐标签失败: {e}") def _write_mp3_tags(self, file_path: Path, music_info: MusicInfo) -> None: """写入MP3标签""" try: audio = MP3(str(file_path), ID3=ID3) # 添加ID3标签 audio.tags.add(TIT2(encoding=3, text=music_info.name)) audio.tags.add(TPE1(encoding=3, text=music_info.artists)) audio.tags.add(TALB(encoding=3, text=music_info.album)) if music_info.track_number > 0: audio.tags.add(TRCK(encoding=3, text=str(music_info.track_number))) # 下载并添加封面 if music_info.pic_url: try: pic_response = requests.get(music_info.pic_url, timeout=10) pic_response.raise_for_status() audio.tags.add(APIC( encoding=3, mime='image/jpeg', type=3, desc='Cover', data=pic_response.content )) except: pass # 封面下载失败不影响主流程 audio.save() except Exception as e: print(f"写入MP3标签失败: {e}") def _write_flac_tags(self, file_path: Path, music_info: MusicInfo) -> None: """写入FLAC标签""" try: audio = FLAC(str(file_path)) audio['TITLE'] = music_info.name audio['ARTIST'] = music_info.artists audio['ALBUM'] = music_info.album if music_info.track_number > 0: audio['TRACKNUMBER'] = str(music_info.track_number) # 下载并添加封面 if music_info.pic_url: try: pic_response = requests.get(music_info.pic_url, timeout=10) pic_response.raise_for_status() from mutagen.flac import Picture picture = Picture() picture.type = 3 # Cover (front) picture.mime = 'image/jpeg' picture.desc = 'Cover' picture.data = pic_response.content audio.add_picture(picture) except: pass # 封面下载失败不影响主流程 audio.save() except Exception as e: print(f"写入FLAC标签失败: {e}") def _write_m4a_tags(self, file_path: Path, music_info: MusicInfo) -> None: """写入M4A标签""" try: audio = MP4(str(file_path)) audio['\xa9nam'] = music_info.name audio['\xa9ART'] = music_info.artists audio['\xa9alb'] = music_info.album if music_info.track_number > 0: audio['trkn'] = [(music_info.track_number, 0)] # 下载并添加封面 if music_info.pic_url: try: pic_response = requests.get(music_info.pic_url, timeout=10) pic_response.raise_for_status() audio['covr'] = [pic_response.content] except: pass # 封面下载失败不影响主流程 audio.save() except Exception as e: print(f"写入M4A标签失败: {e}") def get_download_progress(self, music_id: int, quality: str = "standard") -> Dict[str, Any]: """获取下载进度信息 Args: music_id: 音乐ID quality: 音质等级 Returns: 包含进度信息的字典 """ try: music_info = self.get_music_info(music_id, quality) filename = f"{music_info.artists} - {music_info.name}" safe_filename = self._sanitize_filename(filename) file_ext = self._determine_file_extension(music_info.download_url) file_path = self.download_dir / f"{safe_filename}{file_ext}" if file_path.exists(): current_size = file_path.stat().st_size progress = (current_size / music_info.file_size * 100) if music_info.file_size > 0 else 0 return { 'music_id': music_id, 'filename': safe_filename + file_ext, 'total_size': music_info.file_size, 'current_size': current_size, 'progress': min(progress, 100), 'completed': current_size >= music_info.file_size } else: return { 'music_id': music_id, 'filename': safe_filename + file_ext, 'total_size': music_info.file_size, 'current_size': 0, 'progress': 0, 'completed': False } except Exception as e: return { 'music_id': music_id, 'error': str(e), 'progress': 0, 'completed': False } if __name__ == "__main__": # 测试代码 downloader = MusicDownloader() print("音乐下载器模块") print("支持的功能:") print("- 同步下载") print("- 异步下载") print("- 批量下载") print("- 内存下载") print("- 音乐标签写入") print("- 下载进度跟踪")