diff --git a/README.md b/README.md index 82bc4bf..0477023 100644 --- a/README.md +++ b/README.md @@ -1,138 +1,138 @@ -# 网易云无损音乐解析 - -> **声明** -> 本项目为开源软件,遵循 MIT 许可证。任何个人或组织均可自由使用、修改和分发本项目的源代码。但本项目及其任何衍生作品**禁止用于任何商业或付费项目**。如有违反,将视为对本项目许可证的侵犯。欢迎大家在遵守开源精神和许可证的前提下积极贡献和分享代码。 - ---- - -## 功能简介 - -本项目可解析网易云音乐无损音质下载链接,支持多种音质选择,支持 API 与命令行(GUI)两种模式。 - ---- - -## 快速开始 - -### 1. 安装依赖 - -```bash -pip install -r requirements.txt -``` - -### 2. 配置 Cookie - -请在 `cookie.txt` 文件中填入黑胶会员账号的 Cookie,格式如下: - -``` -MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70; -``` - -> 具体值请参考 `cookie.txt` 示例,替换为你自己的即可。 - -### 3. 运行 - -#### GUI 模式 - -```bash -python main.py --mode gui --url <网易云音乐地址> --level <音质参数> -``` - -#### API 模式 - -```bash -python main.py --mode api -``` - -- 访问接口:http://ip:port/类型解析 -- 支持 GET 和 POST 请求 - ---- - -## 参数说明 - -### GUI 模式参数 - -| 参数 | 说明 | -| ------------ | ---------------------------- | -| --mode | 启动模式:api 或 gui | -| --url | 需要解析的网易云音乐地址 | -| --level | 音质参数(见下方音质说明) | - -### API 模式参数 - -| 参数 | 说明 | -| ------------ | -------------------------------------------- | -| url / ids | 网易云音乐地址或歌曲ID(二选一) | -| level | 音质参数(见下方音质说明) | -| type | 解析类型:json / down / text(三选一) | - -| 类型参数 | 说明 | -| ------------ | -------------------------------------------- | -| Song_v1 | 单曲解析 | -| search | 搜索解析 | -| playlist | 歌单解析 | -| album | 专辑解析 | - ---- - -## 音质参数说明(仅限单曲解析) - -- `standard`:标准音质 -- `exhigh`:极高音质 -- `lossless`:无损音质 -- `hires`:Hi-Res音质 -- `jyeffect`:高清环绕声 -- `sky`:沉浸环绕声 -- `jymaster`:超清母带 - -> 黑胶VIP音质:standard, exhigh, lossless, hires, jyeffect -> 黑胶SVIP音质:sky, jymaster - ---- - -## Docker 一键部署 - -1. **修改参数** - - - 如需修改端口,请编辑 `.env` 或 `docker-compose.yml` 文件中的 `ports` 配置,例如: - - ```yaml - ports: - - "8080:5000" - ``` - -2. **启动服务** - - ```bash - docker-compose up -d - ``` - ---- - -## 在线演示 - -[在线解析](https://api.toubiec.cn/wyapi.html) - ---- - -## 注意事项 - -- 必须使用黑胶会员账号的 Cookie 才能解析高音质资源。 -- Cookie 格式请严格按照 `cookie.txt` 示例填写。 - ---- - -## 致谢 - -- [Ravizhan](https://github.com/ravizhan) - ---- - -## 反馈与交流 - -- 在 Github [Issues](https://github.com/Suxiaoqinx/Netease_url/issues) 提交反馈 -- 或访问 [我的博客](https://www.toubiec.cn) - ---- - +# 网易云无损音乐解析 + +> **声明** +> 本项目为开源软件,遵循 MIT 许可证。任何个人或组织均可自由使用、修改和分发本项目的源代码。但本项目及其任何衍生作品**禁止用于任何商业或付费项目**。如有违反,将视为对本项目许可证的侵犯。欢迎大家在遵守开源精神和许可证的前提下积极贡献和分享代码。 + +--- + +## 功能简介 + +本项目可解析网易云音乐无损音质下载链接,支持多种音质选择,支持 API 与命令行(GUI)两种模式。 + +--- + +## 快速开始 + +### 1. 安装依赖 + +```bash +pip install -r requirements.txt +``` + +### 2. 配置 Cookie + +请在 `cookie.txt` 文件中填入黑胶会员账号的 Cookie,格式如下: + +``` +MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70; +``` + +> 具体值请参考 `cookie.txt` 示例,替换为你自己的即可。 + +### 3. 运行 + +#### GUI 模式 + +```bash +python main.py --mode gui --url <网易云音乐地址> --level <音质参数> +``` + +#### API 模式 + +```bash +python main.py --mode api +``` + +- 访问接口:http://ip:port/类型解析 +- 支持 GET 和 POST 请求 + +--- + +## 参数说明 + +### GUI 模式参数 + +| 参数 | 说明 | +| ------------ | ---------------------------- | +| --mode | 启动模式:api 或 gui | +| --url | 需要解析的网易云音乐地址 | +| --level | 音质参数(见下方音质说明) | + +### API 模式参数 + +| 参数 | 说明 | +| ------------ | -------------------------------------------- | +| url / ids | 网易云音乐地址或歌曲ID(二选一) | +| level | 音质参数(见下方音质说明) | +| type | 解析类型:json / down / text(三选一) | + +| 类型参数 | 说明 | +| ------------ | -------------------------------------------- | +| Song_v1 | 单曲解析 | +| search | 搜索解析 | +| playlist | 歌单解析 | +| album | 专辑解析 | + +--- + +## 音质参数说明(仅限单曲解析) + +- `standard`:标准音质 +- `exhigh`:极高音质 +- `lossless`:无损音质 +- `hires`:Hi-Res音质 +- `jyeffect`:高清环绕声 +- `sky`:沉浸环绕声 +- `jymaster`:超清母带 + +> 黑胶VIP音质:standard, exhigh, lossless, hires, jyeffect +> 黑胶SVIP音质:sky, jymaster + +--- + +## Docker 一键部署 + +1. **修改参数** + + - 如需修改端口,请编辑 `.env` 或 `docker-compose.yml` 文件中的 `ports` 配置,例如: + + ```yaml + ports: + - "8080:5000" + ``` + +2. **启动服务** + + ```bash + docker-compose up -d + ``` + +--- + +## 在线演示 + +[在线解析](https://api.toubiec.cn/wyapi.html) + +--- + +## 注意事项 + +- 必须使用黑胶会员账号的 Cookie 才能解析高音质资源。 +- Cookie 格式请严格按照 `cookie.txt` 示例填写。 + +--- + +## 致谢 + +- [Ravizhan](https://github.com/ravizhan) + +--- + +## 反馈与交流 + +- 在 Github [Issues](https://github.com/Suxiaoqinx/Netease_url/issues) 提交反馈 +- 或访问 [我的博客](https://www.toubiec.cn) + +--- + 欢迎 Star、Fork 和 PR! \ No newline at end of file diff --git a/cookie_manager.py b/cookie_manager.py index 6bf2317..9275b00 100644 --- a/cookie_manager.py +++ b/cookie_manager.py @@ -1,19 +1,469 @@ -import os -from typing import Dict - -class CookieManager: - def __init__(self, cookie_file: str = None): - if cookie_file is None: - script_dir = os.path.dirname(os.path.abspath(__file__)) - cookie_file = os.path.join(script_dir, 'cookie.txt') - self.cookie_file = cookie_file - - def read_cookie(self) -> str: - with open(self.cookie_file, 'r', encoding='utf-8') as f: - return f.read() - - @staticmethod - def parse_cookie(text: str) -> Dict[str, str]: - cookie_ = [item.strip().split('=', 1) for item in text.strip().split(';') if item] - cookie_ = {k.strip(): v.strip() for k, v in cookie_} - return cookie_ +"""Cookie管理器模块 + +提供网易云音乐Cookie管理功能,包括: +- Cookie文件读取和写入 +- Cookie格式验证和解析 +- Cookie有效性检查 +- 自动过期处理 +""" + +import os +import json +import time +from typing import Dict, Optional, List, Tuple, Any +from pathlib import Path +from dataclasses import dataclass +from datetime import datetime, timedelta +import logging + + +@dataclass +class CookieInfo: + """Cookie信息数据类""" + name: str + value: str + domain: str = "" + path: str = "/" + expires: Optional[int] = None + secure: bool = False + http_only: bool = False + + +class CookieException(Exception): + """Cookie相关异常类""" + pass + + +class CookieManager: + """Cookie管理器主类""" + + def __init__(self, cookie_file: str = "cookie.txt"): + """ + 初始化Cookie管理器 + + Args: + cookie_file: Cookie文件路径 + """ + self.cookie_file = Path(cookie_file) + self.logger = logging.getLogger(__name__) + + # 网易云音乐相关的重要Cookie字段 + self.important_cookies = { + 'MUSIC_U', # 用户标识 + 'MUSIC_A', # 用户认证 + '__csrf', # CSRF令牌 + 'NMTID', # 设备标识 + 'WEVNSM', # 会话管理 + 'WNMCID', # 客户端标识 + } + + # 确保cookie文件存在 + self._ensure_cookie_file_exists() + + def _ensure_cookie_file_exists(self) -> None: + """确保Cookie文件存在""" + if not self.cookie_file.exists(): + self.cookie_file.touch() + self.logger.info(f"创建Cookie文件: {self.cookie_file}") + + def read_cookie(self) -> str: + """读取Cookie文件内容 + + Returns: + Cookie字符串内容 + + Raises: + CookieException: 读取失败时抛出 + """ + try: + if not self.cookie_file.exists(): + self.logger.warning(f"Cookie文件不存在: {self.cookie_file}") + return "" + + content = self.cookie_file.read_text(encoding='utf-8').strip() + + if not content: + self.logger.warning("Cookie文件为空") + return "" + + self.logger.debug(f"成功读取Cookie文件,长度: {len(content)}") + return content + + except UnicodeDecodeError as e: + raise CookieException(f"Cookie文件编码错误: {e}") + except PermissionError as e: + raise CookieException(f"没有权限读取Cookie文件: {e}") + except Exception as e: + raise CookieException(f"读取Cookie文件失败: {e}") + + def write_cookie(self, cookie_content: str) -> bool: + """写入Cookie到文件 + + Args: + cookie_content: Cookie内容字符串 + + Returns: + 是否写入成功 + + Raises: + CookieException: 写入失败时抛出 + """ + try: + if not cookie_content or not cookie_content.strip(): + raise CookieException("Cookie内容不能为空") + + # 验证Cookie格式 + if not self.validate_cookie_format(cookie_content): + raise CookieException("Cookie格式无效") + + # 写入文件 + self.cookie_file.write_text(cookie_content.strip(), encoding='utf-8') + + self.logger.info(f"成功写入Cookie到文件: {self.cookie_file}") + return True + + except PermissionError as e: + raise CookieException(f"没有权限写入Cookie文件: {e}") + except Exception as e: + raise CookieException(f"写入Cookie文件失败: {e}") + + def parse_cookies(self) -> Dict[str, str]: + """解析Cookie字符串为字典 + + Returns: + Cookie字典 + + Raises: + CookieException: 解析失败时抛出 + """ + try: + cookie_content = self.read_cookie() + if not cookie_content: + return {} + + return self.parse_cookie_string(cookie_content) + + except Exception as e: + raise CookieException(f"解析Cookie失败: {e}") + + def parse_cookie_string(self, cookie_string: str) -> Dict[str, str]: + """解析Cookie字符串 + + Args: + cookie_string: Cookie字符串 + + Returns: + Cookie字典 + """ + if not cookie_string or not cookie_string.strip(): + return {} + + cookies = {} + + try: + # 处理多种Cookie格式 + cookie_string = cookie_string.strip() + + # 分割Cookie项 + cookie_pairs = [] + if ';' in cookie_string: + cookie_pairs = cookie_string.split(';') + elif '\n' in cookie_string: + cookie_pairs = cookie_string.split('\n') + else: + cookie_pairs = [cookie_string] + + for pair in cookie_pairs: + pair = pair.strip() + if not pair or '=' not in pair: + continue + + # 分割键值对 + key, value = pair.split('=', 1) + key = key.strip() + value = value.strip() + + if key and value: + cookies[key] = value + + self.logger.debug(f"解析得到 {len(cookies)} 个Cookie项") + return cookies + + except Exception as e: + self.logger.error(f"解析Cookie字符串失败: {e}") + return {} + + def validate_cookie_format(self, cookie_string: str) -> bool: + """验证Cookie格式是否有效 + + Args: + cookie_string: Cookie字符串 + + Returns: + 是否格式有效 + """ + if not cookie_string or not cookie_string.strip(): + return False + + try: + # 尝试解析Cookie + cookies = self.parse_cookie_string(cookie_string) + + # 检查是否至少包含一个有效的Cookie + if not cookies: + return False + + # 检查Cookie名称是否合法 + for name, value in cookies.items(): + if not name or not isinstance(name, str): + return False + if not isinstance(value, str): + return False + # 检查是否包含非法字符 + if any(char in name for char in [' ', '\t', '\n', '\r', ';', ',']): + return False + + return True + + except Exception: + return False + + def is_cookie_valid(self) -> bool: + """检查Cookie是否有效 + + Returns: + Cookie是否有效 + """ + try: + cookies = self.parse_cookies() + + if not cookies: + self.logger.warning("Cookie为空") + return False + + # 检查重要Cookie是否存在 + missing_cookies = self.important_cookies - set(cookies.keys()) + if missing_cookies: + self.logger.warning(f"缺少重要Cookie: {missing_cookies}") + return False + + # 检查MUSIC_U是否有效(基本验证) + music_u = cookies.get('MUSIC_U', '') + if not music_u or len(music_u) < 10: + self.logger.warning("MUSIC_U Cookie无效") + return False + + self.logger.debug("Cookie验证通过") + return True + + except Exception as e: + self.logger.error(f"Cookie验证失败: {e}") + return False + + def get_cookie_info(self) -> Dict[str, Any]: + """获取Cookie详细信息 + + Returns: + 包含Cookie信息的字典 + """ + try: + cookies = self.parse_cookies() + + info = { + 'file_path': str(self.cookie_file), + 'file_exists': self.cookie_file.exists(), + 'file_size': self.cookie_file.stat().st_size if self.cookie_file.exists() else 0, + 'cookie_count': len(cookies), + 'is_valid': self.is_cookie_valid(), + 'important_cookies_present': list(self.important_cookies & set(cookies.keys())), + 'missing_important_cookies': list(self.important_cookies - set(cookies.keys())), + 'all_cookie_names': list(cookies.keys()) + } + + # 添加文件修改时间 + if self.cookie_file.exists(): + mtime = self.cookie_file.stat().st_mtime + info['last_modified'] = datetime.fromtimestamp(mtime).isoformat() + + return info + + except Exception as e: + return { + 'error': str(e), + 'file_path': str(self.cookie_file), + 'file_exists': False, + 'is_valid': False + } + + def backup_cookie(self, backup_suffix: str = None) -> str: + """备份Cookie文件 + + Args: + backup_suffix: 备份文件后缀,默认使用时间戳 + + Returns: + 备份文件路径 + + Raises: + CookieException: 备份失败时抛出 + """ + try: + if not self.cookie_file.exists(): + raise CookieException("Cookie文件不存在,无法备份") + + if backup_suffix is None: + backup_suffix = datetime.now().strftime("%Y%m%d_%H%M%S") + + backup_path = self.cookie_file.with_suffix(f".{backup_suffix}.bak") + + # 复制文件内容 + content = self.cookie_file.read_text(encoding='utf-8') + backup_path.write_text(content, encoding='utf-8') + + self.logger.info(f"Cookie备份成功: {backup_path}") + return str(backup_path) + + except Exception as e: + raise CookieException(f"备份Cookie文件失败: {e}") + + def restore_cookie(self, backup_path: str) -> bool: + """从备份恢复Cookie + + Args: + backup_path: 备份文件路径 + + Returns: + 是否恢复成功 + + Raises: + CookieException: 恢复失败时抛出 + """ + try: + backup_file = Path(backup_path) + if not backup_file.exists(): + raise CookieException(f"备份文件不存在: {backup_path}") + + # 读取备份内容 + backup_content = backup_file.read_text(encoding='utf-8') + + # 验证备份内容 + if not self.validate_cookie_format(backup_content): + raise CookieException("备份文件中的Cookie格式无效") + + # 写入当前Cookie文件 + self.write_cookie(backup_content) + + self.logger.info(f"从备份恢复Cookie成功: {backup_path}") + return True + + except Exception as e: + raise CookieException(f"恢复Cookie失败: {e}") + + def clear_cookie(self) -> bool: + """清空Cookie文件 + + Returns: + 是否清空成功 + """ + try: + if self.cookie_file.exists(): + self.cookie_file.write_text("", encoding='utf-8') + self.logger.info("Cookie文件已清空") + return True + + except Exception as e: + self.logger.error(f"清空Cookie文件失败: {e}") + return False + + def update_cookie(self, new_cookies: Dict[str, str]) -> bool: + """更新Cookie + + Args: + new_cookies: 新的Cookie字典 + + Returns: + 是否更新成功 + """ + try: + if not new_cookies: + raise CookieException("新Cookie不能为空") + + # 读取现有Cookie + existing_cookies = self.parse_cookies() + + # 合并Cookie + existing_cookies.update(new_cookies) + + # 转换为Cookie字符串 + cookie_string = '; '.join(f"{k}={v}" for k, v in existing_cookies.items()) + + # 写入文件 + return self.write_cookie(cookie_string) + + except Exception as e: + self.logger.error(f"更新Cookie失败: {e}") + return False + + def get_cookie_for_request(self) -> Dict[str, str]: + """获取用于HTTP请求的Cookie字典 + + Returns: + 适用于requests库的Cookie字典 + """ + try: + cookies = self.parse_cookies() + + # 过滤掉空值 + filtered_cookies = {k: v for k, v in cookies.items() if k and v} + + return filtered_cookies + + except Exception as e: + self.logger.error(f"获取请求Cookie失败: {e}") + return {} + + def format_cookie_string(self, cookies: Dict[str, str]) -> str: + """将Cookie字典格式化为字符串 + + Args: + cookies: Cookie字典 + + Returns: + Cookie字符串 + """ + if not cookies: + return "" + + return '; '.join(f"{k}={v}" for k, v in cookies.items() if k and v) + + def __str__(self) -> str: + """字符串表示""" + info = self.get_cookie_info() + return f"CookieManager(file={info['file_path']}, valid={info['is_valid']}, count={info['cookie_count']})" + + def __repr__(self) -> str: + """详细字符串表示""" + return self.__str__() + + +if __name__ == "__main__": + # 测试代码 + import sys + + # 配置日志 + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + manager = CookieManager() + + print("Cookie管理器模块") + print("支持的功能:") + print("- Cookie文件读写") + print("- Cookie格式验证") + print("- Cookie有效性检查") + print("- Cookie备份和恢复") + print("- Cookie信息查看") + + # 显示当前Cookie信息 + info = manager.get_cookie_info() + print(f"\n当前Cookie状态: {manager}") + print(f"详细信息: {info}") diff --git a/main.py b/main.py index d40b3aa..ac9eebe 100644 --- a/main.py +++ b/main.py @@ -1,224 +1,686 @@ -import argparse -from flask import Flask, request, render_template, redirect, jsonify -from music_api import url_v1, name_v1, lyric_v1, search_music, playlist_detail, album_detail -from cookie_manager import CookieManager +"""网易云音乐API服务主程序 -# ================= 工具函数 ================= -cookie_manager = CookieManager() +提供网易云音乐相关API服务,包括: +- 歌曲信息获取 +- 音乐搜索 +- 歌单和专辑详情 +- 音乐下载 +- 健康检查 +""" -def ids(ids: str) -> str: - if '163cn.tv' in ids: - import requests - response = requests.get(ids, allow_redirects=False) - ids = response.headers.get('Location') - if 'music.163.com' in ids: - index = ids.find('id=') + 3 - ids = ids[index:].split('&')[0] - return ids +import os +import sys +import logging +import traceback +from typing import Dict, Any, Optional, Union, Tuple +from pathlib import Path +from dataclasses import dataclass +from urllib.parse import quote -def size(value: float) -> str: - units = ["B", "KB", "MB", "GB", "TB", "PB"] - size = 1024.0 - for i in range(len(units)): - if (value / size) < 1: - return "%.2f%s" % (value, units[i]) - value = value / size - return str(value) +from flask import Flask, request, jsonify, send_file, render_template, Response +from werkzeug.exceptions import BadRequest, NotFound, InternalServerError -def music_level1(value: str) -> str: - levels = { - 'standard': "标准音质", - 'exhigh': "极高音质", - 'lossless': "无损音质", - 'hires': "Hires音质", - 'sky': "沉浸环绕声", - 'jyeffect': "高清环绕声", - 'jymaster': "超清母带" - } - return levels.get(value, "未知音质") +try: + from music_api import ( + NeteaseAPI, APIException, QualityLevel, + url_v1, name_v1, lyric_v1, search_music, + playlist_detail, album_detail + ) + from cookie_manager import CookieManager, CookieException + from music_downloader import MusicDownloader, DownloadException, AudioFormat +except ImportError as e: + print(f"导入模块失败: {e}") + print("请确保所有依赖模块存在且可用") + sys.exit(1) -# ================= Flask 应用 ================= + +@dataclass +class APIConfig: + """API配置类""" + host: str = '0.0.0.0' + port: int = 5000 + debug: bool = False + downloads_dir: str = 'downloads' + max_file_size: int = 500 * 1024 * 1024 # 500MB + request_timeout: int = 30 + log_level: str = 'INFO' + cors_origins: str = '*' + + +class APIResponse: + """API响应工具类""" + + @staticmethod + def success(data: Any = None, message: str = 'success', status_code: int = 200) -> Tuple[Dict[str, Any], int]: + """成功响应""" + response = { + 'status': status_code, + 'success': True, + 'message': message + } + if data is not None: + response['data'] = data + return response, status_code + + @staticmethod + def error(message: str, status_code: int = 400, error_code: str = None) -> Tuple[Dict[str, Any], int]: + """错误响应""" + response = { + 'status': status_code, + 'success': False, + 'message': message + } + if error_code: + response['error_code'] = error_code + return response, status_code + + +class MusicAPIService: + """音乐API服务类""" + + def __init__(self, config: APIConfig): + self.config = config + self.logger = self._setup_logger() + self.cookie_manager = CookieManager() + self.netease_api = NeteaseAPI() + self.downloader = MusicDownloader() + + # 创建下载目录 + self.downloads_path = Path(config.downloads_dir) + self.downloads_path.mkdir(exist_ok=True) + + self.logger.info(f"音乐API服务初始化完成,下载目录: {self.downloads_path.absolute()}") + + def _setup_logger(self) -> logging.Logger: + """设置日志记录器""" + logger = logging.getLogger('music_api') + logger.setLevel(getattr(logging, self.config.log_level.upper())) + + if not logger.handlers: + # 控制台处理器 + console_handler = logging.StreamHandler() + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + # 文件处理器 + try: + file_handler = logging.FileHandler('music_api.log', encoding='utf-8') + file_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s' + ) + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + except Exception as e: + logger.warning(f"无法创建日志文件: {e}") + + return logger + + def _get_cookies(self) -> Dict[str, str]: + """获取Cookie""" + try: + cookie_str = self.cookie_manager.read_cookie() + return self.cookie_manager.parse_cookie_string(cookie_str) + except CookieException as e: + self.logger.warning(f"获取Cookie失败: {e}") + return {} + except Exception as e: + self.logger.error(f"Cookie处理异常: {e}") + return {} + + def _extract_music_id(self, id_or_url: str) -> str: + """提取音乐ID""" + try: + # 处理短链接 + if '163cn.tv' in id_or_url: + import requests + response = requests.get(id_or_url, allow_redirects=False, timeout=10) + id_or_url = response.headers.get('Location', id_or_url) + + # 处理网易云链接 + if 'music.163.com' in id_or_url: + index = id_or_url.find('id=') + 3 + if index > 2: + return id_or_url[index:].split('&')[0] + + # 直接返回ID + return str(id_or_url).strip() + + except Exception as e: + self.logger.error(f"提取音乐ID失败: {e}") + return str(id_or_url).strip() + + def _format_file_size(self, size_bytes: int) -> str: + """格式化文件大小""" + if size_bytes == 0: + return "0B" + + units = ["B", "KB", "MB", "GB", "TB"] + size = float(size_bytes) + unit_index = 0 + + while size >= 1024.0 and unit_index < len(units) - 1: + size /= 1024.0 + unit_index += 1 + + return f"{size:.2f}{units[unit_index]}" + + def _get_quality_display_name(self, quality: str) -> str: + """获取音质显示名称""" + quality_names = { + 'standard': "标准音质", + 'exhigh': "极高音质", + 'lossless': "无损音质", + 'hires': "Hi-Res音质", + 'sky': "沉浸环绕声", + 'jyeffect': "高清环绕声", + 'jymaster': "超清母带" + } + return quality_names.get(quality, f"未知音质({quality})") + + def _validate_request_params(self, required_params: Dict[str, Any]) -> Optional[Tuple[Dict[str, Any], int]]: + """验证请求参数""" + for param_name, param_value in required_params.items(): + if not param_value: + return APIResponse.error(f"参数 '{param_name}' 不能为空", 400) + return None + + def _safe_get_request_data(self) -> Dict[str, Any]: + """安全获取请求数据""" + try: + if request.method == 'GET': + return dict(request.args) + else: + # 优先使用JSON数据,然后是表单数据 + json_data = request.get_json(silent=True) or {} + form_data = dict(request.form) + # 合并数据,JSON优先 + return {**form_data, **json_data} + except Exception as e: + self.logger.error(f"获取请求数据失败: {e}") + return {} + + +# 创建Flask应用和服务实例 +config = APIConfig() app = Flask(__name__) +api_service = MusicAPIService(config) + + +@app.before_request +def before_request(): + """请求前处理""" + # 记录请求信息 + api_service.logger.info( + f"{request.method} {request.path} - IP: {request.remote_addr} - " + f"User-Agent: {request.headers.get('User-Agent', 'Unknown')}" + ) + @app.after_request -def after_request(response): - response.headers.add('Access-Control-Allow-Origin', '*') +def after_request(response: Response) -> Response: + """请求后处理 - 设置CORS头""" + response.headers.add('Access-Control-Allow-Origin', config.cors_origins) response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') response.headers.add('Access-Control-Allow-Methods', 'GET,POST,OPTIONS') + response.headers.add('Access-Control-Max-Age', '3600') + + # 记录响应信息 + api_service.logger.info(f"响应状态: {response.status_code}") return response -@app.route('/', methods=['GET', 'POST']) -def index(): + +@app.errorhandler(400) +def handle_bad_request(e): + """处理400错误""" + return APIResponse.error("请求参数错误", 400) + + +@app.errorhandler(404) +def handle_not_found(e): + """处理404错误""" + return APIResponse.error("请求的资源不存在", 404) + + +@app.errorhandler(500) +def handle_internal_error(e): + """处理500错误""" + api_service.logger.error(f"服务器内部错误: {e}") + return APIResponse.error("服务器内部错误", 500) + + +@app.route('/') +def index() -> str: + """首页路由""" return render_template('index.html') -@app.route('/Song_V1', methods=['GET', 'POST']) -def Song_v1(): - # 参数获取 - if request.method == 'GET': - song_ids = request.args.get('ids') - url = request.args.get('url') - level = request.args.get('level') - type_ = request.args.get('type') - else: - song_ids = request.form.get('ids') - url = request.form.get('url') - level = request.form.get('level') - type_ = request.form.get('type') - # 参数校验 - if not song_ids and not url: - return jsonify({'error': '必须提供 ids 或 url 参数'}), 400 - if not level: - return jsonify({'error': 'level参数为空'}), 400 - if not type_: - return jsonify({'error': 'type参数为空'}), 400 - - jsondata = song_ids if song_ids else url - cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie()) +@app.route('/health', methods=['GET']) +def health_check(): + """健康检查API""" try: - song_id = ids(jsondata) - urlv1 = url_v1(song_id, level, cookies) - if not urlv1['data'] or urlv1['data'][0]['url'] is None: - return jsonify({"status": 400, 'msg': '信息获取不完整!'}), 400 - namev1 = name_v1(urlv1['data'][0]['id']) - lyricv1 = lyric_v1(urlv1['data'][0]['id'], cookies) - song_data = urlv1['data'][0] - song_info = namev1['songs'][0] if namev1['songs'] else {} - song_url = song_data['url'] - song_name = song_info.get('name', '') - song_picUrl = song_info.get('al', {}).get('picUrl', '') - song_alname = song_info.get('al', {}).get('name', '') - # 歌手名拼接 - artist_names = [] - for song in namev1['songs']: - ar_list = song.get('ar', []) - if ar_list: - artist_names.append('/'.join(ar['name'] for ar in ar_list)) - song_arname = ', '.join(artist_names) - # 歌词 - lyric = lyricv1.get('lrc', {}).get('lyric', '') - tlyric = lyricv1.get('tlyric', {}).get('lyric', None) - except Exception as e: - return jsonify({'status': 500, 'msg': f'服务异常: {str(e)}'}), 500 - - # 响应类型 - if type_ == 'text': - data = f'歌曲名称:{song_name}
歌曲图片:{song_picUrl}
歌手:{song_arname}
歌曲专辑:{song_alname}
歌曲音质:{music_level1(song_data["level"])}
歌曲大小:{size(song_data["size"])}
音乐地址:{song_url}' - elif type_ == 'down': - data = redirect(song_url) - elif type_ == 'json': - data = { - "status": 200, - "name": song_name, - "pic": song_picUrl, - "ar_name": song_arname, - "al_name": song_alname, - "level": music_level1(song_data["level"]), - "size": size(song_data["size"]), - "url": song_url.replace("http://", "https://", 1), - "lyric": lyric, - "tlyric": tlyric + # 检查Cookie状态 + cookie_status = api_service.cookie_manager.is_cookie_valid() + + health_info = { + 'service': 'running', + 'timestamp': int(time.time()) if 'time' in sys.modules else None, + 'cookie_status': 'valid' if cookie_status else 'invalid', + 'downloads_dir': str(api_service.downloads_path.absolute()), + 'version': '2.0.0' } - data = jsonify(data) - else: - data = jsonify({"status": 400, 'msg': '解析失败!请检查参数是否完整!'}), 400 - return data - -@app.route('/Search', methods=['GET', 'POST']) -def search(): - if request.method == 'GET': - keywords = request.args.get('keywords') - limit = request.args.get('limit', default=10, type=int) - else: - keywords = request.form.get('keywords') - limit = int(request.form.get('limit', 10)) - if not keywords: - return jsonify({'error': '必须提供 keywords 参数'}), 400 - cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie()) - try: - songs = search_music(keywords, cookies, limit=limit) - return jsonify({'status': 200, 'result': songs}) + + return APIResponse.success(health_info, "API服务运行正常") + except Exception as e: - return jsonify({'status': 500, 'msg': f'搜索异常: {str(e)}'}), 500 + api_service.logger.error(f"健康检查失败: {e}") + return APIResponse.error(f"健康检查失败: {str(e)}", 500) -@app.route('/Playlist', methods=['GET', 'POST']) -def playlist(): - if request.method == 'GET': - playlist_id = request.args.get('id') - else: - playlist_id = request.form.get('id') - if not playlist_id: - return jsonify({'error': '必须提供歌单id参数'}), 400 - cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie()) + +@app.route('/song', methods=['GET', 'POST']) +@app.route('/Song_V1', methods=['GET', 'POST']) # 向后兼容 +def get_song_info(): + """获取歌曲信息API""" try: - info = playlist_detail(playlist_id, cookies) - return jsonify({'status': 200, 'playlist': info}) + # 获取请求参数 + data = api_service._safe_get_request_data() + song_ids = data.get('ids') or data.get('id') + url = data.get('url') + level = data.get('level', 'lossless') + info_type = data.get('type', 'url') + + # 参数验证 + if not song_ids and not url: + return APIResponse.error("必须提供 'ids'、'id' 或 'url' 参数") + + # 提取音乐ID + music_id = api_service._extract_music_id(song_ids or url) + + # 验证音质参数 + valid_levels = ['standard', 'exhigh', 'lossless', 'hires', 'sky', 'jyeffect', 'jymaster'] + if level not in valid_levels: + return APIResponse.error(f"无效的音质参数,支持: {', '.join(valid_levels)}") + + # 验证类型参数 + valid_types = ['url', 'name', 'lyric', 'json'] + if info_type not in valid_types: + return APIResponse.error(f"无效的类型参数,支持: {', '.join(valid_types)}") + + cookies = api_service._get_cookies() + + # 根据类型获取不同信息 + if info_type == 'url': + result = url_v1(music_id, level, cookies) + if result and result.get('data') and len(result['data']) > 0: + song_data = result['data'][0] + response_data = { + 'id': song_data.get('id'), + 'url': song_data.get('url'), + 'level': song_data.get('level'), + 'quality_name': api_service._get_quality_display_name(song_data.get('level', level)), + 'size': song_data.get('size'), + 'size_formatted': api_service._format_file_size(song_data.get('size', 0)), + 'type': song_data.get('type'), + 'bitrate': song_data.get('br') + } + return APIResponse.success(response_data, "获取歌曲URL成功") + else: + return APIResponse.error("获取音乐URL失败,可能是版权限制或音质不支持", 404) + + elif info_type == 'name': + result = name_v1(music_id) + return APIResponse.success(result, "获取歌曲信息成功") + + elif info_type == 'lyric': + result = lyric_v1(music_id, cookies) + return APIResponse.success(result, "获取歌词成功") + + elif info_type == 'json': + # 获取完整的歌曲信息(用于前端解析) + song_info = name_v1(music_id) + url_info = url_v1(music_id, level, cookies) + lyric_info = lyric_v1(music_id, cookies) + + if not song_info or 'songs' not in song_info or not song_info['songs']: + return APIResponse.error("未找到歌曲信息", 404) + + song_data = song_info['songs'][0] + + # 构建前端期望的响应格式 + response_data = { + 'id': music_id, + 'name': song_data.get('name', ''), + 'ar_name': ', '.join(artist['name'] for artist in song_data.get('ar', [])), + 'al_name': song_data.get('al', {}).get('name', ''), + 'pic': song_data.get('al', {}).get('picUrl', ''), + 'level': level, + 'lyric': lyric_info.get('lrc', {}).get('lyric', '') if lyric_info else '', + 'tlyric': lyric_info.get('tlyric', {}).get('lyric', '') if lyric_info else '' + } + + # 添加URL和大小信息 + if url_info and url_info.get('data') and len(url_info['data']) > 0: + url_data = url_info['data'][0] + response_data.update({ + 'url': url_data.get('url', ''), + 'size': api_service._format_file_size(url_data.get('size', 0)), + 'level': url_data.get('level', level) + }) + else: + response_data.update({ + 'url': '', + 'size': '获取失败' + }) + + return APIResponse.success(response_data, "获取歌曲信息成功") + + except APIException as e: + api_service.logger.error(f"API调用失败: {e}") + return APIResponse.error(f"API调用失败: {str(e)}", 500) except Exception as e: - return jsonify({'status': 500, 'msg': f'歌单解析异常: {str(e)}'}), 500 + api_service.logger.error(f"获取歌曲信息异常: {e}\n{traceback.format_exc()}") + return APIResponse.error(f"服务器错误: {str(e)}", 500) -@app.route('/Album', methods=['GET', 'POST']) -def album(): - if request.method == 'GET': - album_id = request.args.get('id') - else: - album_id = request.form.get('id') - if not album_id: - return jsonify({'error': '必须提供专辑id参数'}), 400 - cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie()) + +@app.route('/search', methods=['GET', 'POST']) +@app.route('/Search', methods=['GET', 'POST']) # 向后兼容 +def search_music_api(): + """搜索音乐API""" try: - info = album_detail(album_id, cookies) - return jsonify({'status': 200, 'album': info}) + # 获取请求参数 + data = api_service._safe_get_request_data() + keyword = data.get('keyword') or data.get('keywords') or data.get('q') + limit = int(data.get('limit', 30)) + offset = int(data.get('offset', 0)) + search_type = data.get('type', '1') # 1-歌曲, 10-专辑, 100-歌手, 1000-歌单 + + # 参数验证 + validation_error = api_service._validate_request_params({'keyword': keyword}) + if validation_error: + return validation_error + + # 限制搜索数量 + if limit > 100: + limit = 100 + + cookies = api_service._get_cookies() + result = search_music(keyword, cookies, limit) + + # search_music返回的是歌曲列表,需要包装成前端期望的格式 + if result: + for song in result: + # 添加艺术家字符串(如果需要) + if 'artists' in song: + song['artist_string'] = song['artists'] + + return APIResponse.success(result, "搜索完成") + + except ValueError as e: + return APIResponse.error(f"参数格式错误: {str(e)}") except Exception as e: - return jsonify({'status': 500, 'msg': f'专辑解析异常: {str(e)}'}), 500 + api_service.logger.error(f"搜索音乐异常: {e}\n{traceback.format_exc()}") + return APIResponse.error(f"搜索失败: {str(e)}", 500) -# ================= 命令行启动 ================= -def start_gui(url: str = None, level: str = 'lossless'): - if url: - print(f"正在处理 URL: {url},音质:{level}") - cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie()) - try: - song_ids = ids(url) - urlv1 = url_v1(song_ids, level, cookies) - namev1 = name_v1(urlv1['data'][0]['id']) - lyricv1 = lyric_v1(urlv1['data'][0]['id'], cookies) - song_info = namev1['songs'][0] - song_name = song_info['name'] - song_pic = song_info['al']['picUrl'] - artist_names = ', '.join(artist['name'] for artist in song_info['ar']) - album_name = song_info['al']['name'] - music_quality = music_level1(urlv1['data'][0]['level']) - file_size = size(urlv1['data'][0]['size']) - music_url = urlv1['data'][0]['url'] - lyrics = lyricv1.get('lrc', {}).get('lyric', '') - translated_lyrics = lyricv1.get('tlyric', {}).get('lyric', None) - output_text = f""" - 歌曲名称: {song_name} - 歌曲图片: {song_pic} - 歌手: {artist_names} - 专辑名称: {album_name} - 音质: {music_quality} - 大小: {file_size} - 音乐链接: {music_url} - 歌词: {lyrics} - 翻译歌词: {translated_lyrics if translated_lyrics else '没有翻译歌词'} - """ - print(output_text) - except Exception as e: - print(f"发生错误: {e}") - else: - print("没有提供 URL 参数") -def start_api(): - app.run(host='0.0.0.0', port=5000, debug=False) +@app.route('/playlist', methods=['GET', 'POST']) +@app.route('/Playlist', methods=['GET', 'POST']) # 向后兼容 +def get_playlist(): + """获取歌单详情API""" + try: + # 获取请求参数 + data = api_service._safe_get_request_data() + playlist_id = data.get('id') + + # 参数验证 + validation_error = api_service._validate_request_params({'playlist_id': playlist_id}) + if validation_error: + return validation_error + + cookies = api_service._get_cookies() + result = playlist_detail(playlist_id, cookies) + + # 适配前端期望的响应格式 + response_data = { + 'status': 'success', + 'playlist': result + } + + return APIResponse.success(response_data, "获取歌单详情成功") + + except Exception as e: + api_service.logger.error(f"获取歌单异常: {e}\n{traceback.format_exc()}") + return APIResponse.error(f"获取歌单失败: {str(e)}", 500) + + +@app.route('/album', methods=['GET', 'POST']) +@app.route('/Album', methods=['GET', 'POST']) # 向后兼容 +def get_album(): + """获取专辑详情API""" + try: + # 获取请求参数 + data = api_service._safe_get_request_data() + album_id = data.get('id') + + # 参数验证 + validation_error = api_service._validate_request_params({'album_id': album_id}) + if validation_error: + return validation_error + + cookies = api_service._get_cookies() + result = album_detail(album_id, cookies) + + # 适配前端期望的响应格式 + response_data = { + 'status': 200, + 'album': result + } + + return APIResponse.success(response_data, "获取专辑详情成功") + + except Exception as e: + api_service.logger.error(f"获取专辑异常: {e}\n{traceback.format_exc()}") + return APIResponse.error(f"获取专辑失败: {str(e)}", 500) + + +@app.route('/download', methods=['GET', 'POST']) +@app.route('/Download', methods=['GET', 'POST']) # 向后兼容 +def download_music_api(): + """下载音乐API""" + try: + # 获取请求参数 + data = api_service._safe_get_request_data() + music_id = data.get('id') + quality = data.get('quality', 'lossless') + return_format = data.get('format', 'file') # file 或 json + + # 参数验证 + validation_error = api_service._validate_request_params({'music_id': music_id}) + if validation_error: + return validation_error + + # 验证音质参数 + valid_qualities = ['standard', 'exhigh', 'lossless', 'hires', 'sky', 'jyeffect', 'jymaster'] + if quality not in valid_qualities: + return APIResponse.error(f"无效的音质参数,支持: {', '.join(valid_qualities)}") + + # 验证返回格式 + if return_format not in ['file', 'json']: + return APIResponse.error("返回格式只支持 'file' 或 'json'") + + music_id = api_service._extract_music_id(music_id) + cookies = api_service._get_cookies() + + # 获取音乐基本信息 + song_info = name_v1(music_id) + if not song_info or 'songs' not in song_info or not song_info['songs']: + return APIResponse.error("未找到音乐信息", 404) + + # 获取音乐下载链接 + url_info = url_v1(music_id, quality, cookies) + if not url_info or 'data' not in url_info or not url_info['data'] or not url_info['data'][0].get('url'): + return APIResponse.error("无法获取音乐下载链接,可能是版权限制或音质不支持", 404) + + # 构建音乐信息 + song_data = song_info['songs'][0] + url_data = url_info['data'][0] + + music_info = { + 'id': music_id, + 'name': song_data['name'], + 'artist_string': ', '.join(artist['name'] for artist in song_data['ar']), + 'album': song_data['al']['name'], + 'pic_url': song_data['al']['picUrl'], + 'file_type': url_data['type'], + 'file_size': url_data['size'], + 'duration': song_data.get('dt', 0), + 'download_url': url_data['url'] + } + + # 生成安全文件名 + safe_name = f"{music_info['name']} [{quality}]" + safe_name = ''.join(c for c in safe_name if c not in r'<>:"/\|?*') + filename = f"{safe_name}.{music_info['file_type']}" + + file_path = api_service.downloads_path / filename + + # 检查文件是否已存在 + if file_path.exists(): + api_service.logger.info(f"文件已存在: {filename}") + else: + # 使用优化后的下载器下载 + try: + download_result = api_service.downloader.download_music_file( + music_id, quality + ) + + if not download_result.success: + return APIResponse.error(f"下载失败: {download_result.error_message}", 500) + + file_path = Path(download_result.file_path) + api_service.logger.info(f"下载完成: {filename}") + + except DownloadException as e: + api_service.logger.error(f"下载异常: {e}") + return APIResponse.error(f"下载失败: {str(e)}", 500) + + # 根据返回格式返回结果 + if return_format == 'json': + response_data = { + 'music_id': music_id, + 'name': music_info['name'], + 'artist': music_info['artist_string'], + 'album': music_info['album'], + 'quality': quality, + 'quality_name': api_service._get_quality_display_name(quality), + 'file_type': music_info['file_type'], + 'file_size': music_info['file_size'], + 'file_size_formatted': api_service._format_file_size(music_info['file_size']), + 'file_path': str(file_path.absolute()), + 'filename': filename, + 'duration': music_info['duration'] + } + return APIResponse.success(response_data, "下载完成") + else: + # 返回文件下载 + if not file_path.exists(): + return APIResponse.error("文件不存在", 404) + + try: + response = send_file( + str(file_path), + as_attachment=True, + download_name=filename, + mimetype=f"audio/{music_info['file_type']}" + ) + response.headers['X-Download-Message'] = 'Download completed successfully' + response.headers['X-Download-Filename'] = quote(filename, safe='') + return response + except Exception as e: + api_service.logger.error(f"发送文件失败: {e}") + return APIResponse.error(f"文件发送失败: {str(e)}", 500) + + except Exception as e: + api_service.logger.error(f"下载音乐异常: {e}\n{traceback.format_exc()}") + return APIResponse.error(f"下载异常: {str(e)}", 500) + + +@app.route('/api/info', methods=['GET']) +def api_info(): + """API信息接口""" + try: + info = { + 'name': '网易云音乐API服务', + 'version': '2.0.0', + 'description': '提供网易云音乐相关API服务', + 'endpoints': { + '/health': 'GET - 健康检查', + '/song': 'GET/POST - 获取歌曲信息', + '/search': 'GET/POST - 搜索音乐', + '/playlist': 'GET/POST - 获取歌单详情', + '/album': 'GET/POST - 获取专辑详情', + '/download': 'GET/POST - 下载音乐', + '/api/info': 'GET - API信息' + }, + 'supported_qualities': [ + 'standard', 'exhigh', 'lossless', + 'hires', 'sky', 'jyeffect', 'jymaster' + ], + 'config': { + 'downloads_dir': str(api_service.downloads_path.absolute()), + 'max_file_size': f"{config.max_file_size // (1024*1024)}MB", + 'request_timeout': f"{config.request_timeout}s" + } + } + + return APIResponse.success(info, "API信息获取成功") + + except Exception as e: + api_service.logger.error(f"获取API信息异常: {e}") + return APIResponse.error(f"获取API信息失败: {str(e)}", 500) + + +def start_api_server(): + """启动API服务器""" + try: + import time + + print("\n" + "="*60) + print("🚀 网易云音乐API服务启动中...") + print("="*60) + print(f"📡 服务地址: http://{config.host}:{config.port}") + print(f"📁 下载目录: {api_service.downloads_path.absolute()}") + print(f"📋 日志级别: {config.log_level}") + print("\n📚 API端点:") + print(f" ├─ GET /health - 健康检查") + print(f" ├─ POST /song - 获取歌曲信息") + print(f" ├─ POST /search - 搜索音乐") + print(f" ├─ POST /playlist - 获取歌单详情") + print(f" ├─ POST /album - 获取专辑详情") + print(f" ├─ POST /download - 下载音乐") + print(f" └─ GET /api/info - API信息") + print("\n🎵 支持的音质:") + print(f" standard, exhigh, lossless, hires, sky, jyeffect, jymaster") + print("="*60) + print(f"⏰ 启动时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") + print("🌟 服务已就绪,等待请求...\n") + + # 启动Flask应用 + app.run( + host=config.host, + port=config.port, + debug=config.debug, + threaded=True + ) + + except KeyboardInterrupt: + print("\n\n👋 服务已停止") + except Exception as e: + api_service.logger.error(f"启动服务失败: {e}") + print(f"❌ 启动失败: {e}") + sys.exit(1) + if __name__ == '__main__': - parser = argparse.ArgumentParser(description="启动 API 或 GUI") - parser.add_argument('--mode', choices=['api', 'gui'], help="选择启动模式:api 或 gui") - parser.add_argument('--url', help="提供 URL 参数供 GUI 模式使用") - parser.add_argument('--level', default='lossless', choices=['standard', 'exhigh', 'lossless', 'hires', 'sky', 'jyeffect', 'jymaster'], help="选择音质等级,默认是 lossless") - args = parser.parse_args() - - if args.mode == 'api': - start_api() - elif args.mode == 'gui': - start_gui(args.url, args.level) + start_api_server() diff --git a/music_api.py b/music_api.py index f7eefb0..4cac887 100644 --- a/music_api.py +++ b/music_api.py @@ -1,360 +1,673 @@ -import json -import urllib.parse -from random import randrange -import requests -from hashlib import md5 -from cryptography.hazmat.primitives import padding -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - -def HexDigest(data): - return "".join([hex(d)[2:].zfill(2) for d in data]) - -def HashDigest(text): - HASH = md5(text.encode("utf-8")) - return HASH.digest() - -def HashHexDigest(text): - return HexDigest(HashDigest(text)) - -def post(url, params, cookie): - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36 Chrome/91.0.4472.164 NeteaseMusicDesktop/2.10.2.200154', - 'Referer': '', - } - cookies = { - "os": "pc", - "appver": "", - "osver": "", - "deviceId": "pyncm!" - } - cookies.update(cookie) - response = requests.post(url, headers=headers, cookies=cookies, data={"params": params}) - return response.text - -def posts(url, params, cookie): - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36 Chrome/91.0.4472.164 NeteaseMusicDesktop/2.10.2.200154', - 'Referer': '', - } - cookies = { - "os": "pc", - "appver": "", - "osver": "", - "deviceId": "pyncm!" - } - cookies.update(cookie) - response = requests.post(url, headers=headers, cookies=cookies, data={"params": params}) - return response # 修改为返回完整的 response 对象 - -def url_v1(id, level, cookies): - url = "https://interface3.music.163.com/eapi/song/enhance/player/url/v1" - AES_KEY = b"e82ckenh8dichen8" - config = { - "os": "pc", - "appver": "", - "osver": "", - "deviceId": "pyncm!", - "requestId": str(randrange(20000000, 30000000)) - } - - payload = { - 'ids': [id], - 'level': level, - 'encodeType': 'flac', - 'header': json.dumps(config), - } - - if level == 'sky': - payload['immerseType'] = 'c51' - - url2 = urllib.parse.urlparse(url).path.replace("/eapi/", "/api/") - digest = HashHexDigest(f"nobody{url2}use{json.dumps(payload)}md5forencrypt") - params = f"{url2}-36cd479b6b5-{json.dumps(payload)}-36cd479b6b5-{digest}" - padder = padding.PKCS7(algorithms.AES(AES_KEY).block_size).padder() - padded_data = padder.update(params.encode()) + padder.finalize() - cipher = Cipher(algorithms.AES(AES_KEY), modes.ECB()) - encryptor = cipher.encryptor() - enc = encryptor.update(padded_data) + encryptor.finalize() - params = HexDigest(enc) - response = post(url, params, cookies) - return json.loads(response) - -def name_v1(id): - urls = "https://interface3.music.163.com/api/v3/song/detail" - data = {'c': json.dumps([{"id":id,"v":0}])} - response = requests.post(url=urls, data=data) - return response.json() - -def lyric_v1(id, cookies): - url = "https://interface3.music.163.com/api/song/lyric" - data = {'id': id, 'cp': 'false', 'tv': '0', 'lv': '0', 'rv': '0', 'kv': '0', 'yv': '0', 'ytv': '0', 'yrv': '0'} - response = requests.post(url=url, data=data, cookies=cookies) - return response.json() - -def search_music(keywords, cookies, limit=10): - """ - 网易云音乐搜索接口,返回歌曲信息列表 - :param keywords: 搜索关键词 - :param cookies: 登录 cookies - :param limit: 返回数量 - :return: 歌曲信息列表 - """ - url = 'https://music.163.com/api/cloudsearch/pc' - data = {'s': keywords, 'type': 1, 'limit': limit} - headers = { - 'User-Agent': 'Mozilla/5.0', - 'Referer': 'https://music.163.com/' - } - response = requests.post(url, data=data, headers=headers, cookies=cookies) - result = response.json() - songs = [] - for item in result.get('result', {}).get('songs', []): - song_info = { - 'id': item['id'], - 'name': item['name'], - 'artists': '/'.join(artist['name'] for artist in item['ar']), - 'album': item['al']['name'], - 'picUrl': item['al']['picUrl'] - } - songs.append(song_info) - return songs - -def playlist_detail(playlist_id, cookies): - """ - 获取网易云歌单详情及全部歌曲列表 - :param playlist_id: 歌单ID - :param cookies: 登录 cookies - :return: 歌单基本信息和全部歌曲列表 - """ - url = f'https://music.163.com/api/v6/playlist/detail' - data = {'id': playlist_id} - headers = { - 'User-Agent': 'Mozilla/5.0', - 'Referer': 'https://music.163.com/' - } - response = requests.post(url, data=data, headers=headers, cookies=cookies) - result = response.json() - playlist = result.get('playlist', {}) - info = { - 'id': playlist.get('id'), - 'name': playlist.get('name'), - 'coverImgUrl': playlist.get('coverImgUrl'), - 'creator': playlist.get('creator', {}).get('nickname', ''), - 'trackCount': playlist.get('trackCount'), - 'description': playlist.get('description', ''), - 'tracks': [] - } - # 获取所有trackIds - track_ids = [str(t['id']) for t in playlist.get('trackIds', [])] - # 分批获取详细信息(每批最多100首) - for i in range(0, len(track_ids), 100): - batch_ids = track_ids[i:i+100] - song_detail_url = 'https://interface3.music.163.com/api/v3/song/detail' - song_data = {'c': json.dumps([{ 'id': int(sid), 'v': 0 } for sid in batch_ids])} - song_resp = requests.post(url=song_detail_url, data=song_data, headers=headers, cookies=cookies) - song_result = song_resp.json() - for song in song_result.get('songs', []): - info['tracks'].append({ - 'id': song['id'], - 'name': song['name'], - 'artists': '/'.join(artist['name'] for artist in song['ar']), - 'album': song['al']['name'], - 'picUrl': song['al']['picUrl'] - }) - return info - -def album_detail(album_id, cookies): - """ - 获取网易云专辑详情及全部歌曲列表 - :param album_id: 专辑ID - :param cookies: 登录 cookies - :return: 专辑基本信息和全部歌曲列表 - """ - url = f'https://music.163.com/api/v1/album/{album_id}' - headers = { - 'User-Agent': 'Mozilla/5.0', - 'Referer': 'https://music.163.com/' - } - response = requests.get(url, headers=headers, cookies=cookies) - result = response.json() - album = result.get('album', {}) - info = { - 'id': album.get('id'), - 'name': album.get('name'), - 'coverImgUrl': get_pic_url(album.get('pic')), - #'coverImgEncryptId': netease_encryptId(str(album.get('pic'))), - 'artist': album.get('artist', {}).get('name', ''), - 'publishTime': album.get('publishTime'), - 'description': album.get('description', ''), - 'songs': [] - } - for song in result.get('songs', []): - info['songs'].append({ - 'id': song['id'], - 'name': song['name'], - 'artists': '/'.join(artist['name'] for artist in song['ar']), - 'album': song['al']['name'], - 'picUrl': get_pic_url(song['al'].get('pic')) - }) - return info - -def netease_encryptId(id_str): - """ - 网易云加密图片ID算法(PHP移植版) - :param id_str: 歌曲/专辑/图片ID(字符串) - :return: 加密后的字符串 - """ - import base64 - magic = list('3go8&$8*3*3h0k(2)2') - song_id = list(id_str) - for i in range(len(song_id)): - song_id[i] = chr(ord(song_id[i]) ^ ord(magic[i % len(magic)])) - m = ''.join(song_id) - import hashlib - md5_bytes = hashlib.md5(m.encode('utf-8')).digest() - result = base64.b64encode(md5_bytes).decode('utf-8') - result = result.replace('/', '_').replace('+', '-') - return result - -def get_pic_url(pic_id, size=300): - """ - 获取网易云加密歌曲/专辑封面直链 - :param pic_id: 封面ID(数字或字符串) - :param size: 图片尺寸,默认300 - :return: url - """ - enc_id = netease_encryptId(str(pic_id)) - url = f'https://p3.music.126.net/{enc_id}/{pic_id}.jpg?param={size}y{size}' - return url - -def generate_qr_key(): - """ - 生成二维码的key - :return: key和unikey - """ - url = 'https://interface3.music.163.com/eapi/login/qrcode/unikey' - AES_KEY = b"e82ckenh8dichen8" - config = { - "os": "pc", - "appver": "", - "osver": "", - "deviceId": "pyncm!", - "requestId": str(randrange(20000000, 30000000)) - } - - payload = { - 'type': 1, - 'header': json.dumps(config) - } - - url2 = urllib.parse.urlparse(url).path.replace("/eapi/", "/api/") - digest = HashHexDigest(f"nobody{url2}use{json.dumps(payload)}md5forencrypt") - params = f"{url2}-36cd479b6b5-{json.dumps(payload)}-36cd479b6b5-{digest}" - padder = padding.PKCS7(algorithms.AES(AES_KEY).block_size).padder() - padded_data = padder.update(params.encode()) + padder.finalize() - cipher = Cipher(algorithms.AES(AES_KEY), modes.ECB()) - encryptor = cipher.encryptor() - enc = encryptor.update(padded_data) + encryptor.finalize() - params = HexDigest(enc) - - response = posts(url, params, {}) - result = json.loads(response.text) # 保持不变,因为 post 函数已修复 - if result['code'] == 200: - return result['unikey'] - return None - -def create_qr_login(): - """ - 创建登录二维码并在控制台显示 - :return: unikey用于检查登录状态 - """ - import qrcode - from qrcode.main import QRCode - import os - - unikey = generate_qr_key() - if not unikey: - print("生成二维码key失败") - return None - - # 创建二维码 - qr = QRCode() - qr.add_data(f'https://music.163.com/login?codekey={unikey}') - qr.make(fit=True) - - # 在控制台显示二维码 - qr.print_ascii(tty=True) - print("\n请使用网易云音乐APP扫描上方二维码登录") - return unikey - -def check_qr_login(unikey): - """ - 检查二维码登录状态 - :param unikey: 二维码key - :return: (登录状态, cookie字典) - """ - url = 'https://interface3.music.163.com/eapi/login/qrcode/client/login' - AES_KEY = b"e82ckenh8dichen8" - config = { - "os": "pc", - "appver": "", - "osver": "", - "deviceId": "pyncm!", - "requestId": str(randrange(20000000, 30000000)) - } - - payload = { - 'key': unikey, - 'type': 1, - 'header': json.dumps(config) - } - - url2 = urllib.parse.urlparse(url).path.replace("/eapi/", "/api/") - digest = HashHexDigest(f"nobody{url2}use{json.dumps(payload)}md5forencrypt") - params = f"{url2}-36cd479b6b5-{json.dumps(payload)}-36cd479b6b5-{digest}" - padder = padding.PKCS7(algorithms.AES(AES_KEY).block_size).padder() - padded_data = padder.update(params.encode()) + padder.finalize() - cipher = Cipher(algorithms.AES(AES_KEY), modes.ECB()) - encryptor = cipher.encryptor() - enc = encryptor.update(padded_data) + encryptor.finalize() - params = HexDigest(enc) - - response = posts(url, params, {}) - result = json.loads(response.text) - - cookie_dict = {} - #print("服务器返回结果:", result) - #print("Set-Cookie头:", response.headers.get('Set-Cookie', '')) - - if result['code'] == 803: - # 直接从响应的headers获取Set-Cookie - all_cookies = response.headers.get('Set-Cookie', '').split(', ') - for cookie_str in all_cookies: - if 'MUSIC_U=' in cookie_str: - cookie_dict['MUSIC_U'] = cookie_str.split('MUSIC_U=')[1].split(';')[0] - - return result['code'], cookie_dict - -def qr_login(): - """ - 完整的二维码登录流程 - :return: 成功返回cookie字典,失败返回None - """ - unikey = create_qr_login() - if not unikey: - return None - - import time - while True: - code, cookies = check_qr_login(unikey) - if code == 803: - print("\n登录成功!") - return 'MUSIC_U=' + cookies['MUSIC_U'] + ';os=pc;appver=8.9.70;' # 修复字符串拼接 - elif code == 801: - print("\r等待扫码...", end='') - elif code == 802: - print("\r扫码成功,请在手机上确认登录...", end='') - else: - print(f"\n登录失败,错误码:{code}") - return None - time.sleep(2) +"""网易云音乐API模块 + +提供网易云音乐相关API接口的封装,包括: +- 音乐URL获取 +- 歌曲详情获取 +- 歌词获取 +- 搜索功能 +- 歌单和专辑详情 +- 二维码登录 +""" + +import json +import urllib.parse +import time +from random import randrange +from typing import Dict, List, Optional, Tuple, Any +from hashlib import md5 +from enum import Enum + +import requests +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +class QualityLevel(Enum): + """音质等级枚举""" + STANDARD = "standard" # 标准音质 + EXHIGH = "exhigh" # 极高音质 + LOSSLESS = "lossless" # 无损音质 + HIRES = "hires" # Hi-Res音质 + SKY = "sky" # 沉浸环绕声 + JYEFFECT = "jyeffect" # 高清环绕声 + JYMASTER = "jymaster" # 超清母带 + + +# 常量定义 +class APIConstants: + """API相关常量""" + AES_KEY = b"e82ckenh8dichen8" + USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36 Chrome/91.0.4472.164 NeteaseMusicDesktop/2.10.2.200154' + REFERER = 'https://music.163.com/' + + # API URLs + SONG_URL_V1 = "https://interface3.music.163.com/eapi/song/enhance/player/url/v1" + SONG_DETAIL_V3 = "https://interface3.music.163.com/api/v3/song/detail" + LYRIC_API = "https://interface3.music.163.com/api/song/lyric" + SEARCH_API = 'https://music.163.com/api/cloudsearch/pc' + PLAYLIST_DETAIL_API = 'https://music.163.com/api/v6/playlist/detail' + ALBUM_DETAIL_API = 'https://music.163.com/api/v1/album/' + QR_UNIKEY_API = 'https://interface3.music.163.com/eapi/login/qrcode/unikey' + QR_LOGIN_API = 'https://interface3.music.163.com/eapi/login/qrcode/client/login' + + # 默认配置 + DEFAULT_CONFIG = { + "os": "pc", + "appver": "", + "osver": "", + "deviceId": "pyncm!" + } + + DEFAULT_COOKIES = { + "os": "pc", + "appver": "", + "osver": "", + "deviceId": "pyncm!" + } + + +class CryptoUtils: + """加密工具类""" + + @staticmethod + def hex_digest(data: bytes) -> str: + """将字节数据转换为十六进制字符串""" + return "".join([hex(d)[2:].zfill(2) for d in data]) + + @staticmethod + def hash_digest(text: str) -> bytes: + """计算MD5哈希值""" + return md5(text.encode("utf-8")).digest() + + @staticmethod + def hash_hex_digest(text: str) -> str: + """计算MD5哈希值并转换为十六进制字符串""" + return CryptoUtils.hex_digest(CryptoUtils.hash_digest(text)) + + @staticmethod + def encrypt_params(url: str, payload: Dict[str, Any]) -> str: + """加密请求参数""" + url_path = urllib.parse.urlparse(url).path.replace("/eapi/", "/api/") + digest = CryptoUtils.hash_hex_digest(f"nobody{url_path}use{json.dumps(payload)}md5forencrypt") + params = f"{url_path}-36cd479b6b5-{json.dumps(payload)}-36cd479b6b5-{digest}" + + # AES加密 + padder = padding.PKCS7(algorithms.AES(APIConstants.AES_KEY).block_size).padder() + padded_data = padder.update(params.encode()) + padder.finalize() + cipher = Cipher(algorithms.AES(APIConstants.AES_KEY), modes.ECB()) + encryptor = cipher.encryptor() + enc = encryptor.update(padded_data) + encryptor.finalize() + + return CryptoUtils.hex_digest(enc) + + +class HTTPClient: + """HTTP客户端类""" + + @staticmethod + def post_request(url: str, params: str, cookies: Dict[str, str]) -> str: + """发送POST请求并返回文本响应""" + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER, + } + + request_cookies = APIConstants.DEFAULT_COOKIES.copy() + request_cookies.update(cookies) + + try: + response = requests.post(url, headers=headers, cookies=request_cookies, + data={"params": params}, timeout=30) + response.raise_for_status() + return response.text + except requests.RequestException as e: + raise APIException(f"HTTP请求失败: {e}") + + @staticmethod + def post_request_full(url: str, params: str, cookies: Dict[str, str]) -> requests.Response: + """发送POST请求并返回完整响应对象""" + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER, + } + + request_cookies = APIConstants.DEFAULT_COOKIES.copy() + request_cookies.update(cookies) + + try: + response = requests.post(url, headers=headers, cookies=request_cookies, + data={"params": params}, timeout=30) + response.raise_for_status() + return response + except requests.RequestException as e: + raise APIException(f"HTTP请求失败: {e}") + + +class APIException(Exception): + """API异常类""" + pass + + +class NeteaseAPI: + """网易云音乐API主类""" + + def __init__(self): + self.http_client = HTTPClient() + self.crypto_utils = CryptoUtils() + + def get_song_url(self, song_id: int, quality: str, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌曲播放URL + + Args: + song_id: 歌曲ID + quality: 音质等级 (standard, exhigh, lossless, hires, sky, jyeffect, jymaster) + cookies: 用户cookies + + Returns: + 包含歌曲URL信息的字典 + + Raises: + APIException: API调用失败时抛出 + """ + try: + config = APIConstants.DEFAULT_CONFIG.copy() + config["requestId"] = str(randrange(20000000, 30000000)) + + payload = { + 'ids': [song_id], + 'level': quality, + 'encodeType': 'flac', + 'header': json.dumps(config), + } + + if quality == 'sky': + payload['immerseType'] = 'c51' + + params = self.crypto_utils.encrypt_params(APIConstants.SONG_URL_V1, payload) + response_text = self.http_client.post_request(APIConstants.SONG_URL_V1, params, cookies) + + result = json.loads(response_text) + if result.get('code') != 200: + raise APIException(f"获取歌曲URL失败: {result.get('message', '未知错误')}") + + return result + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析响应数据失败: {e}") + + def get_song_detail(self, song_id: int) -> Dict[str, Any]: + """获取歌曲详细信息 + + Args: + song_id: 歌曲ID + + Returns: + 包含歌曲详细信息的字典 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = {'c': json.dumps([{"id": song_id, "v": 0}])} + response = requests.post(APIConstants.SONG_DETAIL_V3, data=data, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取歌曲详情失败: {result.get('message', '未知错误')}") + + return result + except requests.RequestException as e: + raise APIException(f"获取歌曲详情请求失败: {e}") + except json.JSONDecodeError as e: + raise APIException(f"解析歌曲详情响应失败: {e}") + + def get_lyric(self, song_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌词信息 + + Args: + song_id: 歌曲ID + cookies: 用户cookies + + Returns: + 包含歌词信息的字典 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = { + 'id': song_id, + 'cp': 'false', + 'tv': '0', + 'lv': '0', + 'rv': '0', + 'kv': '0', + 'yv': '0', + 'ytv': '0', + 'yrv': '0' + } + + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.post(APIConstants.LYRIC_API, data=data, + headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取歌词失败: {result.get('message', '未知错误')}") + + return result + except requests.RequestException as e: + raise APIException(f"获取歌词请求失败: {e}") + except json.JSONDecodeError as e: + raise APIException(f"解析歌词响应失败: {e}") + + def search_music(self, keywords: str, cookies: Dict[str, str], limit: int = 10) -> List[Dict[str, Any]]: + """搜索音乐 + + Args: + keywords: 搜索关键词 + cookies: 用户cookies + limit: 返回数量限制 + + Returns: + 歌曲信息列表 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = {'s': keywords, 'type': 1, 'limit': limit} + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.post(APIConstants.SEARCH_API, data=data, + headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"搜索失败: {result.get('message', '未知错误')}") + + songs = [] + for item in result.get('result', {}).get('songs', []): + song_info = { + 'id': item['id'], + 'name': item['name'], + 'artists': '/'.join(artist['name'] for artist in item['ar']), + 'album': item['al']['name'], + 'picUrl': item['al']['picUrl'] + } + songs.append(song_info) + + return songs + except requests.RequestException as e: + raise APIException(f"搜索请求失败: {e}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析搜索响应失败: {e}") + + def get_playlist_detail(self, playlist_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌单详情 + + Args: + playlist_id: 歌单ID + cookies: 用户cookies + + Returns: + 歌单详情信息 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = {'id': playlist_id} + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.post(APIConstants.PLAYLIST_DETAIL_API, data=data, + headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取歌单详情失败: {result.get('message', '未知错误')}") + + playlist = result.get('playlist', {}) + info = { + 'id': playlist.get('id'), + 'name': playlist.get('name'), + 'coverImgUrl': playlist.get('coverImgUrl'), + 'creator': playlist.get('creator', {}).get('nickname', ''), + 'trackCount': playlist.get('trackCount'), + 'description': playlist.get('description', ''), + 'tracks': [] + } + + # 获取所有trackIds并分批获取详细信息 + track_ids = [str(t['id']) for t in playlist.get('trackIds', [])] + for i in range(0, len(track_ids), 100): + batch_ids = track_ids[i:i+100] + song_data = {'c': json.dumps([{'id': int(sid), 'v': 0} for sid in batch_ids])} + + song_resp = requests.post(APIConstants.SONG_DETAIL_V3, data=song_data, + headers=headers, cookies=cookies, timeout=30) + song_resp.raise_for_status() + + song_result = song_resp.json() + for song in song_result.get('songs', []): + info['tracks'].append({ + 'id': song['id'], + 'name': song['name'], + 'artists': '/'.join(artist['name'] for artist in song['ar']), + 'album': song['al']['name'], + 'picUrl': song['al']['picUrl'] + }) + + return info + except requests.RequestException as e: + raise APIException(f"获取歌单详情请求失败: {e}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析歌单详情响应失败: {e}") + + def get_album_detail(self, album_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取专辑详情 + + Args: + album_id: 专辑ID + cookies: 用户cookies + + Returns: + 专辑详情信息 + + Raises: + APIException: API调用失败时抛出 + """ + try: + url = f'{APIConstants.ALBUM_DETAIL_API}{album_id}' + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.get(url, headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取专辑详情失败: {result.get('message', '未知错误')}") + + album = result.get('album', {}) + info = { + 'id': album.get('id'), + 'name': album.get('name'), + 'coverImgUrl': self.get_pic_url(album.get('pic')), + 'artist': album.get('artist', {}).get('name', ''), + 'publishTime': album.get('publishTime'), + 'description': album.get('description', ''), + 'songs': [] + } + + for song in result.get('songs', []): + info['songs'].append({ + 'id': song['id'], + 'name': song['name'], + 'artists': '/'.join(artist['name'] for artist in song['ar']), + 'album': song['al']['name'], + 'picUrl': self.get_pic_url(song['al'].get('pic')) + }) + + return info + except requests.RequestException as e: + raise APIException(f"获取专辑详情请求失败: {e}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析专辑详情响应失败: {e}") + + def netease_encrypt_id(self, id_str: str) -> str: + """网易云加密图片ID算法 + + Args: + id_str: 图片ID字符串 + + Returns: + 加密后的字符串 + """ + import base64 + import hashlib + + magic = list('3go8&$8*3*3h0k(2)2') + song_id = list(id_str) + + for i in range(len(song_id)): + song_id[i] = chr(ord(song_id[i]) ^ ord(magic[i % len(magic)])) + + m = ''.join(song_id) + md5_bytes = hashlib.md5(m.encode('utf-8')).digest() + result = base64.b64encode(md5_bytes).decode('utf-8') + result = result.replace('/', '_').replace('+', '-') + + return result + + def get_pic_url(self, pic_id: Optional[int], size: int = 300) -> str: + """获取网易云加密歌曲/专辑封面直链 + + Args: + pic_id: 封面ID + size: 图片尺寸 + + Returns: + 图片URL + """ + if pic_id is None: + return '' + + enc_id = self.netease_encrypt_id(str(pic_id)) + return f'https://p3.music.126.net/{enc_id}/{pic_id}.jpg?param={size}y{size}' + + +class QRLoginManager: + """二维码登录管理器""" + + def __init__(self): + self.http_client = HTTPClient() + self.crypto_utils = CryptoUtils() + + def generate_qr_key(self) -> Optional[str]: + """生成二维码的key + + Returns: + 成功返回unikey,失败返回None + + Raises: + APIException: API调用失败时抛出 + """ + try: + config = APIConstants.DEFAULT_CONFIG.copy() + config["requestId"] = str(randrange(20000000, 30000000)) + + payload = { + 'type': 1, + 'header': json.dumps(config) + } + + params = self.crypto_utils.encrypt_params(APIConstants.QR_UNIKEY_API, payload) + response = self.http_client.post_request_full(APIConstants.QR_UNIKEY_API, params, {}) + + result = json.loads(response.text) + if result.get('code') == 200: + return result.get('unikey') + else: + raise APIException(f"生成二维码key失败: {result.get('message', '未知错误')}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析二维码key响应失败: {e}") + + def create_qr_login(self) -> Optional[str]: + """创建登录二维码并在控制台显示 + + Returns: + 成功返回unikey,失败返回None + """ + try: + import qrcode + + unikey = self.generate_qr_key() + if not unikey: + print("生成二维码key失败") + return None + + # 创建二维码 + qr = qrcode.QRCode() + qr.add_data(f'https://music.163.com/login?codekey={unikey}') + qr.make(fit=True) + + # 在控制台显示二维码 + qr.print_ascii(tty=True) + print("\n请使用网易云音乐APP扫描上方二维码登录") + return unikey + except ImportError: + print("请安装qrcode库: pip install qrcode") + return None + except Exception as e: + print(f"创建二维码失败: {e}") + return None + + def check_qr_login(self, unikey: str) -> Tuple[int, Dict[str, str]]: + """检查二维码登录状态 + + Args: + unikey: 二维码key + + Returns: + (登录状态码, cookie字典) + + Raises: + APIException: API调用失败时抛出 + """ + try: + config = APIConstants.DEFAULT_CONFIG.copy() + config["requestId"] = str(randrange(20000000, 30000000)) + + payload = { + 'key': unikey, + 'type': 1, + 'header': json.dumps(config) + } + + params = self.crypto_utils.encrypt_params(APIConstants.QR_LOGIN_API, payload) + response = self.http_client.post_request_full(APIConstants.QR_LOGIN_API, params, {}) + + result = json.loads(response.text) + cookie_dict = {} + + if result.get('code') == 803: + # 登录成功,提取cookie + all_cookies = response.headers.get('Set-Cookie', '').split(', ') + for cookie_str in all_cookies: + if 'MUSIC_U=' in cookie_str: + cookie_dict['MUSIC_U'] = cookie_str.split('MUSIC_U=')[1].split(';')[0] + + return result.get('code', -1), cookie_dict + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析登录状态响应失败: {e}") + + def qr_login(self) -> Optional[str]: + """完整的二维码登录流程 + + Returns: + 成功返回cookie字符串,失败返回None + """ + try: + unikey = self.create_qr_login() + if not unikey: + return None + + while True: + code, cookies = self.check_qr_login(unikey) + + if code == 803: + print("\n登录成功!") + return f"MUSIC_U={cookies['MUSIC_U']};os=pc;appver=8.9.70;" + elif code == 801: + print("\r等待扫码...", end='') + elif code == 802: + print("\r扫码成功,请在手机上确认登录...", end='') + else: + print(f"\n登录失败,错误码:{code}") + return None + + time.sleep(2) + except KeyboardInterrupt: + print("\n用户取消登录") + return None + except Exception as e: + print(f"\n登录过程中发生错误: {e}") + return None + + +# 向后兼容的函数接口 +def url_v1(song_id: int, level: str, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌曲URL(向后兼容)""" + api = NeteaseAPI() + return api.get_song_url(song_id, level, cookies) + + +def name_v1(song_id: int) -> Dict[str, Any]: + """获取歌曲详情(向后兼容)""" + api = NeteaseAPI() + return api.get_song_detail(song_id) + + +def lyric_v1(song_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌词(向后兼容)""" + api = NeteaseAPI() + return api.get_lyric(song_id, cookies) + + +def search_music(keywords: str, cookies: Dict[str, str], limit: int = 10) -> List[Dict[str, Any]]: + """搜索音乐(向后兼容)""" + api = NeteaseAPI() + return api.search_music(keywords, cookies, limit) + + +def playlist_detail(playlist_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌单详情(向后兼容)""" + api = NeteaseAPI() + return api.get_playlist_detail(playlist_id, cookies) + + +def album_detail(album_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取专辑详情(向后兼容)""" + api = NeteaseAPI() + return api.get_album_detail(album_id, cookies) + + +def get_pic_url(pic_id: Optional[int], size: int = 300) -> str: + """获取图片URL(向后兼容)""" + api = NeteaseAPI() + return api.get_pic_url(pic_id, size) + + +def qr_login() -> Optional[str]: + """二维码登录(向后兼容)""" + manager = QRLoginManager() + return manager.qr_login() + + +if __name__ == "__main__": + # 测试代码 + print("网易云音乐API模块") + print("支持的功能:") + print("- 歌曲URL获取") + print("- 歌曲详情获取") + print("- 歌词获取") + print("- 音乐搜索") + print("- 歌单详情") + print("- 专辑详情") + print("- 二维码登录") diff --git a/music_downloader.py b/music_downloader.py new file mode 100644 index 0000000..0d95f39 --- /dev/null +++ b/music_downloader.py @@ -0,0 +1,587 @@ +"""音乐下载器模块 + +提供网易云音乐下载功能,包括: +- 音乐信息获取 +- 文件下载到本地 +- 内存下载 +- 音乐标签写入 +- 异步下载支持 +""" + +import os +import re +import asyncio +import aiohttp +import aiofiles +from io import BytesIO +from typing import Dict, List, Optional, Tuple, Any, Union +from pathlib import Path +from dataclasses import dataclass +from enum import Enum + +import requests +from mutagen.flac import FLAC +from mutagen.mp3 import MP3 +from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TRCK, APIC +from mutagen.mp4 import MP4 + +from music_api import NeteaseAPI, APIException +from cookie_manager import CookieManager + + +class AudioFormat(Enum): + """音频格式枚举""" + MP3 = "mp3" + FLAC = "flac" + M4A = "m4a" + UNKNOWN = "unknown" + + +class QualityLevel(Enum): + """音质等级枚举""" + STANDARD = "standard" # 标准 + EXHIGH = "exhigh" # 极高 + LOSSLESS = "lossless" # 无损 + HIRES = "hires" # Hi-Res + SKY = "sky" # 沉浸环绕声 + JYEFFECT = "jyeffect" # 高清环绕声 + JYMASTER = "jymaster" # 超清母带 + + +@dataclass +class MusicInfo: + """音乐信息数据类""" + id: int + name: str + artists: str + album: str + pic_url: str + duration: int + track_number: int + download_url: str + file_type: str + file_size: int + quality: str + lyric: str = "" + tlyric: str = "" + + +@dataclass +class DownloadResult: + """下载结果数据类""" + success: bool + file_path: Optional[str] = None + file_size: int = 0 + error_message: str = "" + music_info: Optional[MusicInfo] = None + + +class DownloadException(Exception): + """下载异常类""" + pass + + +class MusicDownloader: + """音乐下载器主类""" + + def __init__(self, download_dir: str = "downloads", max_concurrent: int = 3): + """ + 初始化音乐下载器 + + Args: + download_dir: 下载目录 + max_concurrent: 最大并发下载数 + """ + self.download_dir = Path(download_dir) + self.download_dir.mkdir(exist_ok=True) + self.max_concurrent = max_concurrent + + # 初始化依赖 + self.cookie_manager = CookieManager() + self.api = NeteaseAPI() + + # 支持的文件格式 + self.supported_formats = { + 'mp3': AudioFormat.MP3, + 'flac': AudioFormat.FLAC, + 'm4a': AudioFormat.M4A + } + + def _sanitize_filename(self, filename: str) -> str: + """清理文件名,移除非法字符 + + Args: + filename: 原始文件名 + + Returns: + 清理后的安全文件名 + """ + # 移除或替换非法字符 + illegal_chars = r'[<>:"/\\|?*]' + filename = re.sub(illegal_chars, '_', filename) + + # 移除前后空格和点 + filename = filename.strip(' .') + + # 限制长度 + if len(filename) > 200: + filename = filename[:200] + + return filename or "unknown" + + def _determine_file_extension(self, url: str, content_type: str = "") -> str: + """根据URL和Content-Type确定文件扩展名 + + Args: + url: 下载URL + content_type: HTTP Content-Type头 + + Returns: + 文件扩展名 + """ + # 首先尝试从URL获取 + if '.flac' in url.lower(): + return '.flac' + elif '.mp3' in url.lower(): + return '.mp3' + elif '.m4a' in url.lower(): + return '.m4a' + + # 从Content-Type获取 + content_type = content_type.lower() + if 'flac' in content_type: + return '.flac' + elif 'mpeg' in content_type or 'mp3' in content_type: + return '.mp3' + elif 'mp4' in content_type or 'm4a' in content_type: + return '.m4a' + + return '.mp3' # 默认 + + def get_music_info(self, music_id: int, quality: str = "standard") -> MusicInfo: + """获取音乐详细信息 + + Args: + music_id: 音乐ID + quality: 音质等级 + + Returns: + 音乐信息对象 + + Raises: + DownloadException: 获取信息失败时抛出 + """ + try: + # 获取cookies + cookies = self.cookie_manager.parse_cookies() + + # 获取音乐URL信息 + url_result = self.api.get_song_url(music_id, quality, cookies) + if not url_result.get('data') or not url_result['data']: + raise DownloadException(f"无法获取音乐ID {music_id} 的播放链接") + + song_data = url_result['data'][0] + download_url = song_data.get('url', '') + if not download_url: + raise DownloadException(f"音乐ID {music_id} 无可用的下载链接") + + # 获取音乐详情 + detail_result = self.api.get_song_detail(music_id) + if not detail_result.get('songs') or not detail_result['songs']: + raise DownloadException(f"无法获取音乐ID {music_id} 的详细信息") + + song_detail = detail_result['songs'][0] + + # 获取歌词 + lyric_result = self.api.get_lyric(music_id, cookies) + lyric = lyric_result.get('lrc', {}).get('lyric', '') if lyric_result else '' + tlyric = lyric_result.get('tlyric', {}).get('lyric', '') if lyric_result else '' + + # 构建艺术家字符串 + artists = '/'.join(artist['name'] for artist in song_detail.get('ar', [])) + + # 创建MusicInfo对象 + music_info = MusicInfo( + id=music_id, + name=song_detail.get('name', '未知歌曲'), + artists=artists or '未知艺术家', + album=song_detail.get('al', {}).get('name', '未知专辑'), + pic_url=song_detail.get('al', {}).get('picUrl', ''), + duration=song_detail.get('dt', 0) // 1000, # 转换为秒 + track_number=song_detail.get('no', 0), + download_url=download_url, + file_type=song_data.get('type', 'mp3').lower(), + file_size=song_data.get('size', 0), + quality=quality, + lyric=lyric, + tlyric=tlyric + ) + + return music_info + + except APIException as e: + raise DownloadException(f"API调用失败: {e}") + except Exception as e: + raise DownloadException(f"获取音乐信息时发生错误: {e}") + + def download_music_file(self, music_id: int, quality: str = "standard") -> DownloadResult: + """下载音乐文件到本地 + + Args: + music_id: 音乐ID + quality: 音质等级 + + Returns: + 下载结果对象 + """ + try: + # 获取音乐信息 + music_info = self.get_music_info(music_id, quality) + + # 生成文件名 + filename = f"{music_info.artists} - {music_info.name}" + safe_filename = self._sanitize_filename(filename) + + # 确定文件扩展名 + file_ext = self._determine_file_extension(music_info.download_url) + file_path = self.download_dir / f"{safe_filename}{file_ext}" + + # 检查文件是否已存在 + if file_path.exists(): + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + # 下载文件 + response = requests.get(music_info.download_url, stream=True, timeout=30) + response.raise_for_status() + + # 写入文件 + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + # 写入音乐标签 + self._write_music_tags(file_path, music_info) + + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + except DownloadException: + raise + except requests.RequestException as e: + return DownloadResult( + success=False, + error_message=f"下载请求失败: {e}" + ) + except Exception as e: + return DownloadResult( + success=False, + error_message=f"下载过程中发生错误: {e}" + ) + + async def download_music_file_async(self, music_id: int, quality: str = "standard") -> DownloadResult: + """异步下载音乐文件到本地 + + Args: + music_id: 音乐ID + quality: 音质等级 + + Returns: + 下载结果对象 + """ + try: + # 获取音乐信息(同步操作) + music_info = self.get_music_info(music_id, quality) + + # 生成文件名 + filename = f"{music_info.artists} - {music_info.name}" + safe_filename = self._sanitize_filename(filename) + + # 确定文件扩展名 + file_ext = self._determine_file_extension(music_info.download_url) + file_path = self.download_dir / f"{safe_filename}{file_ext}" + + # 检查文件是否已存在 + if file_path.exists(): + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + # 异步下载文件 + async with aiohttp.ClientSession() as session: + async with session.get(music_info.download_url) as response: + response.raise_for_status() + + async with aiofiles.open(file_path, 'wb') as f: + async for chunk in response.content.iter_chunked(8192): + await f.write(chunk) + + # 写入音乐标签 + self._write_music_tags(file_path, music_info) + + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + except DownloadException: + raise + except aiohttp.ClientError as e: + return DownloadResult( + success=False, + error_message=f"异步下载请求失败: {e}" + ) + except Exception as e: + return DownloadResult( + success=False, + error_message=f"异步下载过程中发生错误: {e}" + ) + + def download_music_to_memory(self, music_id: int, quality: str = "standard") -> Tuple[bool, BytesIO, MusicInfo]: + """下载音乐到内存 + + Args: + music_id: 音乐ID + quality: 音质等级 + + Returns: + (是否成功, 音乐数据流, 音乐信息) + + Raises: + DownloadException: 下载失败时抛出 + """ + try: + # 获取音乐信息 + music_info = self.get_music_info(music_id, quality) + + # 下载到内存 + response = requests.get(music_info.download_url, timeout=30) + response.raise_for_status() + + # 创建BytesIO对象 + audio_data = BytesIO(response.content) + + return True, audio_data, music_info + + except DownloadException: + raise + except requests.RequestException as e: + raise DownloadException(f"下载到内存失败: {e}") + except Exception as e: + raise DownloadException(f"内存下载过程中发生错误: {e}") + + async def download_batch_async(self, music_ids: List[int], quality: str = "standard") -> List[DownloadResult]: + """批量异步下载音乐 + + Args: + music_ids: 音乐ID列表 + quality: 音质等级 + + Returns: + 下载结果列表 + """ + semaphore = asyncio.Semaphore(self.max_concurrent) + + async def download_with_semaphore(music_id: int) -> DownloadResult: + async with semaphore: + return await self.download_music_file_async(music_id, quality) + + tasks = [download_with_semaphore(music_id) for music_id in music_ids] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 处理异常结果 + processed_results = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + processed_results.append(DownloadResult( + success=False, + error_message=f"下载音乐ID {music_ids[i]} 时发生异常: {result}" + )) + else: + processed_results.append(result) + + return processed_results + + def _write_music_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入音乐标签信息 + + Args: + file_path: 音乐文件路径 + music_info: 音乐信息 + """ + try: + file_ext = file_path.suffix.lower() + + if file_ext == '.mp3': + self._write_mp3_tags(file_path, music_info) + elif file_ext == '.flac': + self._write_flac_tags(file_path, music_info) + elif file_ext == '.m4a': + self._write_m4a_tags(file_path, music_info) + + except Exception as e: + print(f"写入音乐标签失败: {e}") + + def _write_mp3_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入MP3标签""" + try: + audio = MP3(str(file_path), ID3=ID3) + + # 添加ID3标签 + audio.tags.add(TIT2(encoding=3, text=music_info.name)) + audio.tags.add(TPE1(encoding=3, text=music_info.artists)) + audio.tags.add(TALB(encoding=3, text=music_info.album)) + + if music_info.track_number > 0: + audio.tags.add(TRCK(encoding=3, text=str(music_info.track_number))) + + # 下载并添加封面 + if music_info.pic_url: + try: + pic_response = requests.get(music_info.pic_url, timeout=10) + pic_response.raise_for_status() + audio.tags.add(APIC( + encoding=3, + mime='image/jpeg', + type=3, + desc='Cover', + data=pic_response.content + )) + except: + pass # 封面下载失败不影响主流程 + + audio.save() + except Exception as e: + print(f"写入MP3标签失败: {e}") + + def _write_flac_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入FLAC标签""" + try: + audio = FLAC(str(file_path)) + + audio['TITLE'] = music_info.name + audio['ARTIST'] = music_info.artists + audio['ALBUM'] = music_info.album + + if music_info.track_number > 0: + audio['TRACKNUMBER'] = str(music_info.track_number) + + # 下载并添加封面 + if music_info.pic_url: + try: + pic_response = requests.get(music_info.pic_url, timeout=10) + pic_response.raise_for_status() + + from mutagen.flac import Picture + picture = Picture() + picture.type = 3 # Cover (front) + picture.mime = 'image/jpeg' + picture.desc = 'Cover' + picture.data = pic_response.content + audio.add_picture(picture) + except: + pass # 封面下载失败不影响主流程 + + audio.save() + except Exception as e: + print(f"写入FLAC标签失败: {e}") + + def _write_m4a_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入M4A标签""" + try: + audio = MP4(str(file_path)) + + audio['\xa9nam'] = music_info.name + audio['\xa9ART'] = music_info.artists + audio['\xa9alb'] = music_info.album + + if music_info.track_number > 0: + audio['trkn'] = [(music_info.track_number, 0)] + + # 下载并添加封面 + if music_info.pic_url: + try: + pic_response = requests.get(music_info.pic_url, timeout=10) + pic_response.raise_for_status() + audio['covr'] = [pic_response.content] + except: + pass # 封面下载失败不影响主流程 + + audio.save() + except Exception as e: + print(f"写入M4A标签失败: {e}") + + def get_download_progress(self, music_id: int, quality: str = "standard") -> Dict[str, Any]: + """获取下载进度信息 + + Args: + music_id: 音乐ID + quality: 音质等级 + + Returns: + 包含进度信息的字典 + """ + try: + music_info = self.get_music_info(music_id, quality) + + filename = f"{music_info.artists} - {music_info.name}" + safe_filename = self._sanitize_filename(filename) + file_ext = self._determine_file_extension(music_info.download_url) + file_path = self.download_dir / f"{safe_filename}{file_ext}" + + if file_path.exists(): + current_size = file_path.stat().st_size + progress = (current_size / music_info.file_size * 100) if music_info.file_size > 0 else 0 + + return { + 'music_id': music_id, + 'filename': safe_filename + file_ext, + 'total_size': music_info.file_size, + 'current_size': current_size, + 'progress': min(progress, 100), + 'completed': current_size >= music_info.file_size + } + else: + return { + 'music_id': music_id, + 'filename': safe_filename + file_ext, + 'total_size': music_info.file_size, + 'current_size': 0, + 'progress': 0, + 'completed': False + } + + except Exception as e: + return { + 'music_id': music_id, + 'error': str(e), + 'progress': 0, + 'completed': False + } + + +if __name__ == "__main__": + # 测试代码 + downloader = MusicDownloader() + print("音乐下载器模块") + print("支持的功能:") + print("- 同步下载") + print("- 异步下载") + print("- 批量下载") + print("- 内存下载") + print("- 音乐标签写入") + print("- 下载进度跟踪") \ No newline at end of file diff --git a/qr_login.py b/qr_login.py index 1c8ba3d..e108178 100644 --- a/qr_login.py +++ b/qr_login.py @@ -1,9 +1,364 @@ -from music_api import qr_login -print("开始网易云音乐二维码登录流程...") -cookies = qr_login() - -if cookies: - print("\nCookie信息:") - print(cookies) -else: - print("登录失败,请重试。") +"""网易云音乐二维码登录模块 + +提供网易云音乐二维码登录功能,包括: +- 二维码生成和显示 +- 登录状态检查 +- Cookie获取和保存 +- 用户友好的交互界面 +""" + +import sys +import time +import logging +from typing import Optional, Dict, Any, Tuple +from pathlib import Path + +try: + from music_api import QRLoginManager, APIException + from cookie_manager import CookieManager, CookieException +except ImportError as e: + print(f"导入模块失败: {e}") + print("请确保 music_api.py 和 cookie_manager.py 文件存在且可用") + sys.exit(1) + + +class QRLoginClient: + """二维码登录客户端""" + + def __init__(self, cookie_file: str = "cookie.txt"): + """ + 初始化二维码登录客户端 + + Args: + cookie_file: Cookie保存文件路径 + """ + self.cookie_manager = CookieManager(cookie_file) + self.qr_manager = QRLoginManager() + self.logger = logging.getLogger(__name__) + + # 配置日志 + if not self.logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + def check_existing_login(self) -> bool: + """检查是否已有有效登录 + + Returns: + 是否已登录 + """ + try: + if self.cookie_manager.is_cookie_valid(): + self.logger.info("检测到有效的登录Cookie") + return True + else: + self.logger.info("未检测到有效的登录Cookie") + return False + except Exception as e: + self.logger.error(f"检查登录状态失败: {e}") + return False + + def interactive_login(self) -> Tuple[bool, Optional[str]]: + """交互式二维码登录 + + Returns: + (登录是否成功, 错误信息) + """ + try: + print("\n=== 网易云音乐二维码登录 ===") + + # 检查现有登录状态 + if self.check_existing_login(): + choice = input("检测到已有有效登录,是否重新登录?(y/N): ").strip().lower() + if choice not in ['y', 'yes', '是']: + print("使用现有登录状态") + return True, None + + print("\n开始二维码登录流程...") + + # 生成二维码 + print("正在生成二维码...") + qr_result = self.qr_manager.create_qr_login() + + if not qr_result['success']: + error_msg = f"生成二维码失败: {qr_result.get('message', '未知错误')}" + self.logger.error(error_msg) + return False, error_msg + + qr_key = qr_result['qr_key'] + print(f"\n二维码已生成!") + print(f"请使用网易云音乐手机APP扫描二维码进行登录") + print(f"二维码有效期: 3分钟") + print("\n等待扫码中...") + + # 轮询检查登录状态 + max_attempts = 60 # 最多等待5分钟 + attempt = 0 + + while attempt < max_attempts: + try: + # 检查登录状态 + status_result = self.qr_manager.check_qr_login(qr_key) + + if status_result['success']: + if status_result['status'] == 'success': + # 登录成功 + cookie = status_result.get('cookie', '') + if cookie: + # 保存Cookie + success = self.save_cookie(cookie) + if success: + print("\n✅ 登录成功!Cookie已保存") + return True, None + else: + error_msg = "登录成功但Cookie保存失败" + self.logger.error(error_msg) + return False, error_msg + else: + error_msg = "登录成功但未获取到Cookie" + self.logger.error(error_msg) + return False, error_msg + + elif status_result['status'] == 'waiting': + # 等待扫码 + if attempt % 10 == 0: # 每10次显示一次提示 + print(f"等待扫码中... ({attempt + 1}/{max_attempts})") + + elif status_result['status'] == 'scanned': + # 已扫码,等待确认 + print("二维码已扫描,请在手机上确认登录") + + elif status_result['status'] == 'expired': + # 二维码过期 + error_msg = "二维码已过期,请重新尝试" + print(f"\n❌ {error_msg}") + return False, error_msg + + elif status_result['status'] == 'error': + # 登录错误 + error_msg = f"登录失败: {status_result.get('message', '未知错误')}" + print(f"\n❌ {error_msg}") + return False, error_msg + + else: + self.logger.warning(f"检查登录状态失败: {status_result.get('message', '未知错误')}") + + # 等待5秒后重试 + time.sleep(5) + attempt += 1 + + except KeyboardInterrupt: + print("\n用户取消登录") + return False, "用户取消登录" + except Exception as e: + self.logger.error(f"检查登录状态时发生错误: {e}") + time.sleep(5) + attempt += 1 + + # 超时 + error_msg = "登录超时,请重新尝试" + print(f"\n❌ {error_msg}") + return False, error_msg + + except APIException as e: + error_msg = f"API调用失败: {e}" + self.logger.error(error_msg) + return False, error_msg + except Exception as e: + error_msg = f"登录过程中发生未知错误: {e}" + self.logger.error(error_msg) + return False, error_msg + + def save_cookie(self, cookie: str) -> bool: + """保存Cookie到文件 + + Args: + cookie: Cookie字符串 + + Returns: + 是否保存成功 + """ + try: + # 备份现有Cookie(如果存在) + if self.cookie_manager.cookie_file.exists(): + try: + backup_path = self.cookie_manager.backup_cookie() + self.logger.info(f"已备份现有Cookie到: {backup_path}") + except Exception as e: + self.logger.warning(f"备份Cookie失败: {e}") + + # 保存新Cookie + success = self.cookie_manager.write_cookie(cookie) + + if success: + # 验证保存的Cookie + if self.cookie_manager.is_cookie_valid(): + self.logger.info("Cookie保存并验证成功") + return True + else: + self.logger.warning("Cookie保存成功但验证失败") + return False + else: + self.logger.error("Cookie保存失败") + return False + + except CookieException as e: + self.logger.error(f"Cookie操作失败: {e}") + return False + except Exception as e: + self.logger.error(f"保存Cookie时发生错误: {e}") + return False + + def show_login_info(self) -> None: + """显示登录信息""" + try: + info = self.cookie_manager.get_cookie_info() + + print("\n=== 登录状态信息 ===") + print(f"Cookie文件: {info['file_path']}") + print(f"文件存在: {'是' if info['file_exists'] else '否'}") + print(f"Cookie数量: {info['cookie_count']}") + print(f"登录状态: {'有效' if info['is_valid'] else '无效'}") + + if info.get('last_modified'): + print(f"最后更新: {info['last_modified']}") + + if info['is_valid']: + present_cookies = info.get('important_cookies_present', []) + print(f"重要Cookie: {', '.join(present_cookies)}") + else: + missing_cookies = info.get('missing_important_cookies', []) + if missing_cookies: + print(f"缺少Cookie: {', '.join(missing_cookies)}") + + except Exception as e: + print(f"获取登录信息失败: {e}") + + def logout(self) -> bool: + """登出(清除Cookie) + + Returns: + 是否登出成功 + """ + try: + # 备份Cookie + if self.cookie_manager.cookie_file.exists(): + try: + backup_path = self.cookie_manager.backup_cookie("logout") + print(f"Cookie已备份到: {backup_path}") + except Exception as e: + self.logger.warning(f"备份Cookie失败: {e}") + + # 清除Cookie + success = self.cookie_manager.clear_cookie() + + if success: + print("已成功登出") + return True + else: + print("登出失败") + return False + + except Exception as e: + print(f"登出时发生错误: {e}") + return False + + +def main(): + """主函数""" + # 配置日志 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + client = QRLoginClient() + + # 解析命令行参数 + if len(sys.argv) > 1: + command = sys.argv[1].lower() + + if command == 'login': + # 执行登录 + success, error = client.interactive_login() + if success: + print("\n登录完成!") + client.show_login_info() + sys.exit(0) + else: + print(f"\n登录失败: {error}") + sys.exit(1) + + elif command == 'status' or command == 'info': + # 显示登录状态 + client.show_login_info() + sys.exit(0) + + elif command == 'logout': + # 登出 + success = client.logout() + sys.exit(0 if success else 1) + + elif command == 'help' or command == '-h' or command == '--help': + # 显示帮助 + print("网易云音乐二维码登录工具") + print("\n用法:") + print(" python qr_login.py [命令]") + print("\n命令:") + print(" login - 执行二维码登录") + print(" status - 显示登录状态") + print(" logout - 登出(清除Cookie)") + print(" help - 显示此帮助信息") + print("\n如果不提供命令,将进入交互模式") + sys.exit(0) + + else: + print(f"未知命令: {command}") + print("使用 'python qr_login.py help' 查看帮助") + sys.exit(1) + + # 交互模式 + try: + while True: + print("\n=== 网易云音乐登录工具 ===") + print("1. 二维码登录") + print("2. 查看登录状态") + print("3. 登出") + print("4. 退出") + + choice = input("\n请选择操作 (1-4): ").strip() + + if choice == '1': + success, error = client.interactive_login() + if success: + print("\n登录成功!") + client.show_login_info() + else: + print(f"\n登录失败: {error}") + + elif choice == '2': + client.show_login_info() + + elif choice == '3': + client.logout() + + elif choice == '4': + print("再见!") + break + + else: + print("无效选择,请重试") + + except KeyboardInterrupt: + print("\n\n程序已退出") + sys.exit(0) + except Exception as e: + print(f"\n程序运行出错: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index 3c559fc..4fdb48e 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/使用文档.md b/使用文档.md new file mode 100644 index 0000000..f1adb94 --- /dev/null +++ b/使用文档.md @@ -0,0 +1,465 @@ +# 网易云音乐工具箱 - 使用文档 + +## 📖 项目简介 + +网易云音乐工具箱是一个功能强大的网易云音乐解析工具,支持歌曲搜索、单曲解析、歌单解析、专辑解析和音乐下载等功能。项目提供了友好的Web界面和完整的API接口,支持多种音质选择,包括无损、Hi-Res等高品质音频格式。 + +## ✨ 功能特性 + +### 🎵 核心功能 +- **歌曲搜索**:支持关键词搜索网易云音乐库中的歌曲 +- **单曲解析**:解析单首歌曲的详细信息和下载链接 +- **歌单解析**:批量解析歌单中的所有歌曲 +- **专辑解析**:批量解析专辑中的所有歌曲 +- **音乐下载**:支持多种音质的音乐文件下载 + +### 🎧 音质支持 +- `standard`:标准音质 (128kbps) +- `exhigh`:极高音质 (320kbps) +- `lossless`:无损音质 (FLAC) +- `hires`:Hi-Res音质 (24bit/96kHz) +- `jyeffect`:高清环绕声 +- `sky`:沉浸环绕声 +- `jymaster`:超清母带 + +### 🌐 使用方式 +- **Web界面**:直观的网页操作界面 +- **API接口**:完整的RESTful API +- **批量处理**:支持歌单和专辑的批量解析 + +## 🚀 快速开始 + +### 环境要求 +- Python 3.7+ +- 网易云音乐黑胶会员账号(用于获取高音质资源) + +### 安装步骤 + +1. **克隆项目** + ```bash + git clone https://github.com/Suxiaoqinx/Netease_url.git + cd Netease_url + ``` + +2. **安装依赖** + ```bash + pip install -r requirements.txt + ``` + +3. **配置Cookie** + + 在 `cookie.txt` 文件中填入黑胶会员账号的Cookie: + ``` + MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70; + ``` + + > 💡 **获取Cookie方法**: + > 1. 登录网易云音乐网页版 + > 2. 按F12打开开发者工具 + > 3. 在Network标签页找到任意请求 + > 4. 复制请求头中的Cookie值 + +4. **启动服务** + ```bash + python main.py + ``` + +5. **访问界面** + + 打开浏览器访问:`http://localhost:5000` + +## 🎯 使用指南 + +### Web界面使用 + +#### 1. 歌曲搜索 +1. 选择「歌曲搜索」功能 +2. 输入搜索关键词 +3. 点击「搜索」按钮 +4. 在搜索结果中点击「解析」或「下载」按钮 + +#### 2. 单曲解析 +1. 选择「单曲解析」功能 +2. 输入歌曲ID或网易云音乐链接 +3. 选择音质等级 +4. 点击「解析」按钮查看歌曲信息 +5. 点击「下载」按钮下载音乐文件 + +#### 3. 歌单解析 +1. 选择「歌单解析」功能 +2. 输入歌单ID或歌单链接 +3. 点击「解析歌单」按钮 +4. 查看歌单信息和歌曲列表 +5. 点击单首歌曲的「解析」或「下载」按钮 + +#### 4. 专辑解析 +1. 选择「专辑解析」功能 +2. 输入专辑ID或专辑链接 +3. 点击「解析专辑」按钮 +4. 查看专辑信息和歌曲列表 +5. 点击单首歌曲的「解析」或「下载」按钮 + +#### 5. 音乐下载 +1. 选择「音乐下载」功能 +2. 输入音乐ID或链接 +3. 选择下载音质 +4. 点击「下载」按钮 +5. 等待下载完成 + +### 链接格式支持 + +项目支持多种网易云音乐链接格式: + +- **歌曲链接**:`https://music.163.com/song?id=123456` +- **歌单链接**:`https://music.163.com/playlist?id=123456` +- **专辑链接**:`https://music.163.com/album?id=123456` +- **直接ID**:`123456` + +## 🔌 API接口文档 + +### 基础信息 +- **服务地址**:`http://localhost:5000` +- **请求方式**:POST(部分接口支持GET) +- **响应格式**:JSON + +### 接口列表 + +#### 1. 健康检查 +```http +GET /health +``` + +**响应示例**: +```json +{ + "status": 200, + "success": true, + "message": "服务运行正常", + "data": { + "service": "网易云音乐API", + "version": "1.0.0", + "uptime": "2小时3分钟" + } +} +``` + +#### 2. 歌曲搜索 +```http +POST /search +``` + +**请求参数**: +```json +{ + "keyword": "搜索关键词", + "limit": 30 +} +``` + +**响应示例**: +```json +{ + "status": 200, + "success": true, + "message": "搜索完成", + "data": [ + { + "id": "123456", + "name": "歌曲名称", + "artists": "艺术家", + "album": "专辑名称", + "picUrl": "封面图片URL" + } + ] +} +``` + +#### 3. 单曲解析 +```http +POST /song +``` + +**请求参数**: +```json +{ + "url": "歌曲ID或链接", + "level": "lossless", + "type": "json" +} +``` + +**响应示例**: +```json +{ + "status": 200, + "success": true, + "message": "解析成功", + "data": { + "name": "歌曲名称", + "ar_name": "艺术家", + "al_name": "专辑名称", + "level": "lossless", + "size": "45.2MB", + "url": "下载链接", + "pic": "封面图片", + "lyric": "歌词内容" + } +} +``` + +#### 4. 歌单解析 +```http +POST /playlist +``` + +**请求参数**: +```json +{ + "id": "歌单ID" +} +``` + +**响应示例**: +```json +{ + "status": 200, + "success": true, + "message": "歌单解析成功", + "data": { + "playlist": { + "name": "歌单名称", + "creator": "创建者", + "description": "歌单描述", + "coverImgUrl": "封面图片", + "trackCount": 50, + "tracks": [ + { + "id": "123456", + "name": "歌曲名称", + "artists": "艺术家", + "album": "专辑名称", + "picUrl": "封面图片" + } + ] + } + } +} +``` + +#### 5. 专辑解析 +```http +POST /album +``` + +**请求参数**: +```json +{ + "id": "专辑ID" +} +``` + +**响应示例**: +```json +{ + "status": 200, + "success": true, + "message": "专辑解析成功", + "data": { + "album": { + "name": "专辑名称", + "artist": "艺术家", + "description": "专辑描述", + "coverImgUrl": "封面图片", + "songs": [ + { + "id": "123456", + "name": "歌曲名称", + "artists": "艺术家", + "album": "专辑名称", + "picUrl": "封面图片" + } + ] + } + } +} +``` + +#### 6. 音乐下载 +```http +POST /download +``` + +**请求参数**: +```json +{ + "id": "音乐ID", + "quality": "lossless" +} +``` + +**响应**:返回音频文件流,支持直接下载 + +## 🐳 Docker部署 + +### 使用Docker Compose + +1. **修改配置**(可选) + + 编辑 `docker-compose.yml` 文件修改端口: + ```yaml + ports: + - "8080:5000" # 将服务映射到8080端口 + ``` + +2. **启动服务** + ```bash + docker-compose up -d + ``` + +3. **查看日志** + ```bash + docker-compose logs -f + ``` + +4. **停止服务** + ```bash + docker-compose down + ``` + +### 使用Dockerfile + +```bash +# 构建镜像 +docker build -t netease-music-api . + +# 运行容器 +docker run -d -p 5000:5000 -v $(pwd)/downloads:/app/downloads netease-music-api +``` + +## ⚙️ 配置说明 + +### 环境变量 + +可以通过 `.env` 文件或环境变量配置服务: + +```bash +# 服务配置 +HOST=0.0.0.0 +PORT=5000 +DEBUG=false + +# 下载配置 +DOWNLOADS_DIR=downloads +MAX_FILE_SIZE=524288000 # 500MB + +# 日志配置 +LOG_LEVEL=INFO +``` + +### Cookie配置 + +在 `cookie.txt` 文件中配置网易云音乐Cookie: + +``` +MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70; +``` + +> ⚠️ **重要提示**: +> - 必须使用黑胶会员账号的Cookie +> - Cookie格式必须严格按照示例填写 +> - 定期更新Cookie以保持有效性 + +## 🔧 故障排除 + +### 常见问题 + +#### 1. Cookie无效 +**问题**:提示Cookie无效或过期 + +**解决方案**: +- 确认使用的是黑胶会员账号 +- 重新获取Cookie并更新 `cookie.txt` +- 检查Cookie格式是否正确 + +#### 2. 无法下载高音质 +**问题**:只能下载标准音质 + +**解决方案**: +- 确认账号是黑胶会员 +- 检查Cookie是否有效 +- 确认歌曲本身支持高音质 + +#### 3. 服务启动失败 +**问题**:运行 `python main.py` 报错 + +**解决方案**: +- 检查Python版本(需要3.7+) +- 安装所有依赖:`pip install -r requirements.txt` +- 检查端口5000是否被占用 + +#### 4. 下载文件损坏 +**问题**:下载的音频文件无法播放 + +**解决方案**: +- 检查网络连接是否稳定 +- 重新下载文件 +- 尝试其他音质选项 + +### 日志查看 + +服务运行时会生成日志文件 `music_api.log`,可以查看详细的错误信息: + +```bash +tail -f music_api.log +``` + +## 📝 开发说明 + +### 项目结构 + +``` +Netease_url/ +├── main.py # 主程序入口 +├── music_api.py # 音乐API核心模块 +├── music_downloader.py # 音乐下载模块 +├── cookie_manager.py # Cookie管理模块 +├── qr_login.py # 二维码登录模块 +├── templates/ +│ └── index.html # Web界面模板 +├── downloads/ # 下载文件目录 +├── requirements.txt # Python依赖 +├── Dockerfile # Docker构建文件 +├── docker-compose.yml # Docker Compose配置 +└── README.md # 项目说明 +``` + +### 技术栈 + +- **后端**:Flask + Python +- **前端**:Bootstrap + jQuery +- **音频处理**:mutagen +- **HTTP客户端**:aiohttp + requests +- **容器化**:Docker + +## 📄 许可证 + +本项目采用 MIT 许可证开源,但**禁止用于任何商业或付费项目**。 + +## 🤝 贡献指南 + +欢迎提交Issue和Pull Request! + +1. Fork本项目 +2. 创建特性分支:`git checkout -b feature/new-feature` +3. 提交更改:`git commit -am 'Add new feature'` +4. 推送分支:`git push origin feature/new-feature` +5. 提交Pull Request + +## 📞 联系方式 + +- **GitHub Issues**:[提交问题](https://github.com/Suxiaoqinx/Netease_url/issues) +- **作者博客**:[https://www.toubiec.cn](https://www.toubiec.cn) + +--- + +**⭐ 如果这个项目对你有帮助,请给个Star支持一下!** \ No newline at end of file