Add files via upload

This commit is contained in:
苏晓晴 2025-08-25 01:23:41 +08:00 committed by GitHub
parent 6e7c50cf6f
commit 93e19dbb78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 3353 additions and 721 deletions

274
README.md
View File

@ -1,138 +1,138 @@
# 网易云无损音乐解析
> **声明**
> 本项目为开源软件,遵循 MIT 许可证。任何个人或组织均可自由使用、修改和分发本项目的源代码。但本项目及其任何衍生作品**禁止用于任何商业或付费项目**。如有违反,将视为对本项目许可证的侵犯。欢迎大家在遵守开源精神和许可证的前提下积极贡献和分享代码。
---
## 功能简介
本项目可解析网易云音乐无损音质下载链接,支持多种音质选择,支持 API 与命令行GUI两种模式。
---
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 配置 Cookie
请在 `cookie.txt` 文件中填入黑胶会员账号的 Cookie格式如下
```
MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70;
```
> 具体值请参考 `cookie.txt` 示例,替换为你自己的即可。
### 3. 运行
#### GUI 模式
```bash
python main.py --mode gui --url <网易云音乐地址> --level <音质参数>
```
#### API 模式
```bash
python main.py --mode api
```
- 访问接口http://ip:port/类型解析
- 支持 GET 和 POST 请求
---
## 参数说明
### GUI 模式参数
| 参数 | 说明 |
| ------------ | ---------------------------- |
| --mode | 启动模式api 或 gui |
| --url | 需要解析的网易云音乐地址 |
| --level | 音质参数(见下方音质说明) |
### API 模式参数
| 参数 | 说明 |
| ------------ | -------------------------------------------- |
| url / ids | 网易云音乐地址或歌曲ID二选一 |
| level | 音质参数(见下方音质说明) |
| type | 解析类型json / down / text三选一 |
| 类型参数 | 说明 |
| ------------ | -------------------------------------------- |
| Song_v1 | 单曲解析 |
| search | 搜索解析 |
| playlist | 歌单解析 |
| album | 专辑解析 |
---
## 音质参数说明(仅限单曲解析)
- `standard`:标准音质
- `exhigh`:极高音质
- `lossless`:无损音质
- `hires`Hi-Res音质
- `jyeffect`:高清环绕声
- `sky`:沉浸环绕声
- `jymaster`:超清母带
> 黑胶VIP音质standard, exhigh, lossless, hires, jyeffect
> 黑胶SVIP音质sky, jymaster
---
## Docker 一键部署
1. **修改参数**
- 如需修改端口,请编辑 `.env``docker-compose.yml` 文件中的 `ports` 配置,例如:
```yaml
ports:
- "8080:5000"
```
2. **启动服务**
```bash
docker-compose up -d
```
---
## 在线演示
[在线解析](https://api.toubiec.cn/wyapi.html)
---
## 注意事项
- 必须使用黑胶会员账号的 Cookie 才能解析高音质资源。
- Cookie 格式请严格按照 `cookie.txt` 示例填写。
---
## 致谢
- [Ravizhan](https://github.com/ravizhan)
---
## 反馈与交流
- 在 Github [Issues](https://github.com/Suxiaoqinx/Netease_url/issues) 提交反馈
- 或访问 [我的博客](https://www.toubiec.cn)
---
# 网易云无损音乐解析
> **声明**
> 本项目为开源软件,遵循 MIT 许可证。任何个人或组织均可自由使用、修改和分发本项目的源代码。但本项目及其任何衍生作品**禁止用于任何商业或付费项目**。如有违反,将视为对本项目许可证的侵犯。欢迎大家在遵守开源精神和许可证的前提下积极贡献和分享代码。
---
## 功能简介
本项目可解析网易云音乐无损音质下载链接,支持多种音质选择,支持 API 与命令行GUI两种模式。
---
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 配置 Cookie
请在 `cookie.txt` 文件中填入黑胶会员账号的 Cookie格式如下
```
MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70;
```
> 具体值请参考 `cookie.txt` 示例,替换为你自己的即可。
### 3. 运行
#### GUI 模式
```bash
python main.py --mode gui --url <网易云音乐地址> --level <音质参数>
```
#### API 模式
```bash
python main.py --mode api
```
- 访问接口http://ip:port/类型解析
- 支持 GET 和 POST 请求
---
## 参数说明
### GUI 模式参数
| 参数 | 说明 |
| ------------ | ---------------------------- |
| --mode | 启动模式api 或 gui |
| --url | 需要解析的网易云音乐地址 |
| --level | 音质参数(见下方音质说明) |
### API 模式参数
| 参数 | 说明 |
| ------------ | -------------------------------------------- |
| url / ids | 网易云音乐地址或歌曲ID二选一 |
| level | 音质参数(见下方音质说明) |
| type | 解析类型json / down / text三选一 |
| 类型参数 | 说明 |
| ------------ | -------------------------------------------- |
| Song_v1 | 单曲解析 |
| search | 搜索解析 |
| playlist | 歌单解析 |
| album | 专辑解析 |
---
## 音质参数说明(仅限单曲解析)
- `standard`:标准音质
- `exhigh`:极高音质
- `lossless`:无损音质
- `hires`Hi-Res音质
- `jyeffect`:高清环绕声
- `sky`:沉浸环绕声
- `jymaster`:超清母带
> 黑胶VIP音质standard, exhigh, lossless, hires, jyeffect
> 黑胶SVIP音质sky, jymaster
---
## Docker 一键部署
1. **修改参数**
- 如需修改端口,请编辑 `.env``docker-compose.yml` 文件中的 `ports` 配置,例如:
```yaml
ports:
- "8080:5000"
```
2. **启动服务**
```bash
docker-compose up -d
```
---
## 在线演示
[在线解析](https://api.toubiec.cn/wyapi.html)
---
## 注意事项
- 必须使用黑胶会员账号的 Cookie 才能解析高音质资源。
- Cookie 格式请严格按照 `cookie.txt` 示例填写。
---
## 致谢
- [Ravizhan](https://github.com/ravizhan)
---
## 反馈与交流
- 在 Github [Issues](https://github.com/Suxiaoqinx/Netease_url/issues) 提交反馈
- 或访问 [我的博客](https://www.toubiec.cn)
---
欢迎 Star、Fork 和 PR

View File

@ -1,19 +1,469 @@
import os
from typing import Dict
class CookieManager:
def __init__(self, cookie_file: str = None):
if cookie_file is None:
script_dir = os.path.dirname(os.path.abspath(__file__))
cookie_file = os.path.join(script_dir, 'cookie.txt')
self.cookie_file = cookie_file
def read_cookie(self) -> str:
with open(self.cookie_file, 'r', encoding='utf-8') as f:
return f.read()
@staticmethod
def parse_cookie(text: str) -> Dict[str, str]:
cookie_ = [item.strip().split('=', 1) for item in text.strip().split(';') if item]
cookie_ = {k.strip(): v.strip() for k, v in cookie_}
return cookie_
"""Cookie管理器模块
提供网易云音乐Cookie管理功能包括
- Cookie文件读取和写入
- Cookie格式验证和解析
- Cookie有效性检查
- 自动过期处理
"""
import os
import json
import time
from typing import Dict, Optional, List, Tuple, Any
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
@dataclass
class CookieInfo:
"""Cookie信息数据类"""
name: str
value: str
domain: str = ""
path: str = "/"
expires: Optional[int] = None
secure: bool = False
http_only: bool = False
class CookieException(Exception):
"""Cookie相关异常类"""
pass
class CookieManager:
"""Cookie管理器主类"""
def __init__(self, cookie_file: str = "cookie.txt"):
"""
初始化Cookie管理器
Args:
cookie_file: Cookie文件路径
"""
self.cookie_file = Path(cookie_file)
self.logger = logging.getLogger(__name__)
# 网易云音乐相关的重要Cookie字段
self.important_cookies = {
'MUSIC_U', # 用户标识
'MUSIC_A', # 用户认证
'__csrf', # CSRF令牌
'NMTID', # 设备标识
'WEVNSM', # 会话管理
'WNMCID', # 客户端标识
}
# 确保cookie文件存在
self._ensure_cookie_file_exists()
def _ensure_cookie_file_exists(self) -> None:
"""确保Cookie文件存在"""
if not self.cookie_file.exists():
self.cookie_file.touch()
self.logger.info(f"创建Cookie文件: {self.cookie_file}")
def read_cookie(self) -> str:
"""读取Cookie文件内容
Returns:
Cookie字符串内容
Raises:
CookieException: 读取失败时抛出
"""
try:
if not self.cookie_file.exists():
self.logger.warning(f"Cookie文件不存在: {self.cookie_file}")
return ""
content = self.cookie_file.read_text(encoding='utf-8').strip()
if not content:
self.logger.warning("Cookie文件为空")
return ""
self.logger.debug(f"成功读取Cookie文件长度: {len(content)}")
return content
except UnicodeDecodeError as e:
raise CookieException(f"Cookie文件编码错误: {e}")
except PermissionError as e:
raise CookieException(f"没有权限读取Cookie文件: {e}")
except Exception as e:
raise CookieException(f"读取Cookie文件失败: {e}")
def write_cookie(self, cookie_content: str) -> bool:
"""写入Cookie到文件
Args:
cookie_content: Cookie内容字符串
Returns:
是否写入成功
Raises:
CookieException: 写入失败时抛出
"""
try:
if not cookie_content or not cookie_content.strip():
raise CookieException("Cookie内容不能为空")
# 验证Cookie格式
if not self.validate_cookie_format(cookie_content):
raise CookieException("Cookie格式无效")
# 写入文件
self.cookie_file.write_text(cookie_content.strip(), encoding='utf-8')
self.logger.info(f"成功写入Cookie到文件: {self.cookie_file}")
return True
except PermissionError as e:
raise CookieException(f"没有权限写入Cookie文件: {e}")
except Exception as e:
raise CookieException(f"写入Cookie文件失败: {e}")
def parse_cookies(self) -> Dict[str, str]:
"""解析Cookie字符串为字典
Returns:
Cookie字典
Raises:
CookieException: 解析失败时抛出
"""
try:
cookie_content = self.read_cookie()
if not cookie_content:
return {}
return self.parse_cookie_string(cookie_content)
except Exception as e:
raise CookieException(f"解析Cookie失败: {e}")
def parse_cookie_string(self, cookie_string: str) -> Dict[str, str]:
"""解析Cookie字符串
Args:
cookie_string: Cookie字符串
Returns:
Cookie字典
"""
if not cookie_string or not cookie_string.strip():
return {}
cookies = {}
try:
# 处理多种Cookie格式
cookie_string = cookie_string.strip()
# 分割Cookie项
cookie_pairs = []
if ';' in cookie_string:
cookie_pairs = cookie_string.split(';')
elif '\n' in cookie_string:
cookie_pairs = cookie_string.split('\n')
else:
cookie_pairs = [cookie_string]
for pair in cookie_pairs:
pair = pair.strip()
if not pair or '=' not in pair:
continue
# 分割键值对
key, value = pair.split('=', 1)
key = key.strip()
value = value.strip()
if key and value:
cookies[key] = value
self.logger.debug(f"解析得到 {len(cookies)} 个Cookie项")
return cookies
except Exception as e:
self.logger.error(f"解析Cookie字符串失败: {e}")
return {}
def validate_cookie_format(self, cookie_string: str) -> bool:
"""验证Cookie格式是否有效
Args:
cookie_string: Cookie字符串
Returns:
是否格式有效
"""
if not cookie_string or not cookie_string.strip():
return False
try:
# 尝试解析Cookie
cookies = self.parse_cookie_string(cookie_string)
# 检查是否至少包含一个有效的Cookie
if not cookies:
return False
# 检查Cookie名称是否合法
for name, value in cookies.items():
if not name or not isinstance(name, str):
return False
if not isinstance(value, str):
return False
# 检查是否包含非法字符
if any(char in name for char in [' ', '\t', '\n', '\r', ';', ',']):
return False
return True
except Exception:
return False
def is_cookie_valid(self) -> bool:
"""检查Cookie是否有效
Returns:
Cookie是否有效
"""
try:
cookies = self.parse_cookies()
if not cookies:
self.logger.warning("Cookie为空")
return False
# 检查重要Cookie是否存在
missing_cookies = self.important_cookies - set(cookies.keys())
if missing_cookies:
self.logger.warning(f"缺少重要Cookie: {missing_cookies}")
return False
# 检查MUSIC_U是否有效基本验证
music_u = cookies.get('MUSIC_U', '')
if not music_u or len(music_u) < 10:
self.logger.warning("MUSIC_U Cookie无效")
return False
self.logger.debug("Cookie验证通过")
return True
except Exception as e:
self.logger.error(f"Cookie验证失败: {e}")
return False
def get_cookie_info(self) -> Dict[str, Any]:
"""获取Cookie详细信息
Returns:
包含Cookie信息的字典
"""
try:
cookies = self.parse_cookies()
info = {
'file_path': str(self.cookie_file),
'file_exists': self.cookie_file.exists(),
'file_size': self.cookie_file.stat().st_size if self.cookie_file.exists() else 0,
'cookie_count': len(cookies),
'is_valid': self.is_cookie_valid(),
'important_cookies_present': list(self.important_cookies & set(cookies.keys())),
'missing_important_cookies': list(self.important_cookies - set(cookies.keys())),
'all_cookie_names': list(cookies.keys())
}
# 添加文件修改时间
if self.cookie_file.exists():
mtime = self.cookie_file.stat().st_mtime
info['last_modified'] = datetime.fromtimestamp(mtime).isoformat()
return info
except Exception as e:
return {
'error': str(e),
'file_path': str(self.cookie_file),
'file_exists': False,
'is_valid': False
}
def backup_cookie(self, backup_suffix: str = None) -> str:
"""备份Cookie文件
Args:
backup_suffix: 备份文件后缀默认使用时间戳
Returns:
备份文件路径
Raises:
CookieException: 备份失败时抛出
"""
try:
if not self.cookie_file.exists():
raise CookieException("Cookie文件不存在无法备份")
if backup_suffix is None:
backup_suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.cookie_file.with_suffix(f".{backup_suffix}.bak")
# 复制文件内容
content = self.cookie_file.read_text(encoding='utf-8')
backup_path.write_text(content, encoding='utf-8')
self.logger.info(f"Cookie备份成功: {backup_path}")
return str(backup_path)
except Exception as e:
raise CookieException(f"备份Cookie文件失败: {e}")
def restore_cookie(self, backup_path: str) -> bool:
"""从备份恢复Cookie
Args:
backup_path: 备份文件路径
Returns:
是否恢复成功
Raises:
CookieException: 恢复失败时抛出
"""
try:
backup_file = Path(backup_path)
if not backup_file.exists():
raise CookieException(f"备份文件不存在: {backup_path}")
# 读取备份内容
backup_content = backup_file.read_text(encoding='utf-8')
# 验证备份内容
if not self.validate_cookie_format(backup_content):
raise CookieException("备份文件中的Cookie格式无效")
# 写入当前Cookie文件
self.write_cookie(backup_content)
self.logger.info(f"从备份恢复Cookie成功: {backup_path}")
return True
except Exception as e:
raise CookieException(f"恢复Cookie失败: {e}")
def clear_cookie(self) -> bool:
"""清空Cookie文件
Returns:
是否清空成功
"""
try:
if self.cookie_file.exists():
self.cookie_file.write_text("", encoding='utf-8')
self.logger.info("Cookie文件已清空")
return True
except Exception as e:
self.logger.error(f"清空Cookie文件失败: {e}")
return False
def update_cookie(self, new_cookies: Dict[str, str]) -> bool:
"""更新Cookie
Args:
new_cookies: 新的Cookie字典
Returns:
是否更新成功
"""
try:
if not new_cookies:
raise CookieException("新Cookie不能为空")
# 读取现有Cookie
existing_cookies = self.parse_cookies()
# 合并Cookie
existing_cookies.update(new_cookies)
# 转换为Cookie字符串
cookie_string = '; '.join(f"{k}={v}" for k, v in existing_cookies.items())
# 写入文件
return self.write_cookie(cookie_string)
except Exception as e:
self.logger.error(f"更新Cookie失败: {e}")
return False
def get_cookie_for_request(self) -> Dict[str, str]:
"""获取用于HTTP请求的Cookie字典
Returns:
适用于requests库的Cookie字典
"""
try:
cookies = self.parse_cookies()
# 过滤掉空值
filtered_cookies = {k: v for k, v in cookies.items() if k and v}
return filtered_cookies
except Exception as e:
self.logger.error(f"获取请求Cookie失败: {e}")
return {}
def format_cookie_string(self, cookies: Dict[str, str]) -> str:
"""将Cookie字典格式化为字符串
Args:
cookies: Cookie字典
Returns:
Cookie字符串
"""
if not cookies:
return ""
return '; '.join(f"{k}={v}" for k, v in cookies.items() if k and v)
def __str__(self) -> str:
"""字符串表示"""
info = self.get_cookie_info()
return f"CookieManager(file={info['file_path']}, valid={info['is_valid']}, count={info['cookie_count']})"
def __repr__(self) -> str:
"""详细字符串表示"""
return self.__str__()
if __name__ == "__main__":
# 测试代码
import sys
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
manager = CookieManager()
print("Cookie管理器模块")
print("支持的功能:")
print("- Cookie文件读写")
print("- Cookie格式验证")
print("- Cookie有效性检查")
print("- Cookie备份和恢复")
print("- Cookie信息查看")
# 显示当前Cookie信息
info = manager.get_cookie_info()
print(f"\n当前Cookie状态: {manager}")
print(f"详细信息: {info}")

854
main.py
View File

@ -1,224 +1,686 @@
import argparse
from flask import Flask, request, render_template, redirect, jsonify
from music_api import url_v1, name_v1, lyric_v1, search_music, playlist_detail, album_detail
from cookie_manager import CookieManager
"""网易云音乐API服务主程序
# ================= 工具函数 =================
cookie_manager = CookieManager()
提供网易云音乐相关API服务包括
- 歌曲信息获取
- 音乐搜索
- 歌单和专辑详情
- 音乐下载
- 健康检查
"""
def ids(ids: str) -> str:
if '163cn.tv' in ids:
import requests
response = requests.get(ids, allow_redirects=False)
ids = response.headers.get('Location')
if 'music.163.com' in ids:
index = ids.find('id=') + 3
ids = ids[index:].split('&')[0]
return ids
import os
import sys
import logging
import traceback
from typing import Dict, Any, Optional, Union, Tuple
from pathlib import Path
from dataclasses import dataclass
from urllib.parse import quote
def size(value: float) -> str:
units = ["B", "KB", "MB", "GB", "TB", "PB"]
size = 1024.0
for i in range(len(units)):
if (value / size) < 1:
return "%.2f%s" % (value, units[i])
value = value / size
return str(value)
from flask import Flask, request, jsonify, send_file, render_template, Response
from werkzeug.exceptions import BadRequest, NotFound, InternalServerError
def music_level1(value: str) -> str:
levels = {
'standard': "标准音质",
'exhigh': "极高音质",
'lossless': "无损音质",
'hires': "Hires音质",
'sky': "沉浸环绕声",
'jyeffect': "高清环绕声",
'jymaster': "超清母带"
}
return levels.get(value, "未知音质")
try:
from music_api import (
NeteaseAPI, APIException, QualityLevel,
url_v1, name_v1, lyric_v1, search_music,
playlist_detail, album_detail
)
from cookie_manager import CookieManager, CookieException
from music_downloader import MusicDownloader, DownloadException, AudioFormat
except ImportError as e:
print(f"导入模块失败: {e}")
print("请确保所有依赖模块存在且可用")
sys.exit(1)
# ================= Flask 应用 =================
@dataclass
class APIConfig:
"""API配置类"""
host: str = '0.0.0.0'
port: int = 5000
debug: bool = False
downloads_dir: str = 'downloads'
max_file_size: int = 500 * 1024 * 1024 # 500MB
request_timeout: int = 30
log_level: str = 'INFO'
cors_origins: str = '*'
class APIResponse:
"""API响应工具类"""
@staticmethod
def success(data: Any = None, message: str = 'success', status_code: int = 200) -> Tuple[Dict[str, Any], int]:
"""成功响应"""
response = {
'status': status_code,
'success': True,
'message': message
}
if data is not None:
response['data'] = data
return response, status_code
@staticmethod
def error(message: str, status_code: int = 400, error_code: str = None) -> Tuple[Dict[str, Any], int]:
"""错误响应"""
response = {
'status': status_code,
'success': False,
'message': message
}
if error_code:
response['error_code'] = error_code
return response, status_code
class MusicAPIService:
"""音乐API服务类"""
def __init__(self, config: APIConfig):
self.config = config
self.logger = self._setup_logger()
self.cookie_manager = CookieManager()
self.netease_api = NeteaseAPI()
self.downloader = MusicDownloader()
# 创建下载目录
self.downloads_path = Path(config.downloads_dir)
self.downloads_path.mkdir(exist_ok=True)
self.logger.info(f"音乐API服务初始化完成下载目录: {self.downloads_path.absolute()}")
def _setup_logger(self) -> logging.Logger:
"""设置日志记录器"""
logger = logging.getLogger('music_api')
logger.setLevel(getattr(logging, self.config.log_level.upper()))
if not logger.handlers:
# 控制台处理器
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# 文件处理器
try:
file_handler = logging.FileHandler('music_api.log', encoding='utf-8')
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
except Exception as e:
logger.warning(f"无法创建日志文件: {e}")
return logger
def _get_cookies(self) -> Dict[str, str]:
"""获取Cookie"""
try:
cookie_str = self.cookie_manager.read_cookie()
return self.cookie_manager.parse_cookie_string(cookie_str)
except CookieException as e:
self.logger.warning(f"获取Cookie失败: {e}")
return {}
except Exception as e:
self.logger.error(f"Cookie处理异常: {e}")
return {}
def _extract_music_id(self, id_or_url: str) -> str:
"""提取音乐ID"""
try:
# 处理短链接
if '163cn.tv' in id_or_url:
import requests
response = requests.get(id_or_url, allow_redirects=False, timeout=10)
id_or_url = response.headers.get('Location', id_or_url)
# 处理网易云链接
if 'music.163.com' in id_or_url:
index = id_or_url.find('id=') + 3
if index > 2:
return id_or_url[index:].split('&')[0]
# 直接返回ID
return str(id_or_url).strip()
except Exception as e:
self.logger.error(f"提取音乐ID失败: {e}")
return str(id_or_url).strip()
def _format_file_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
if size_bytes == 0:
return "0B"
units = ["B", "KB", "MB", "GB", "TB"]
size = float(size_bytes)
unit_index = 0
while size >= 1024.0 and unit_index < len(units) - 1:
size /= 1024.0
unit_index += 1
return f"{size:.2f}{units[unit_index]}"
def _get_quality_display_name(self, quality: str) -> str:
"""获取音质显示名称"""
quality_names = {
'standard': "标准音质",
'exhigh': "极高音质",
'lossless': "无损音质",
'hires': "Hi-Res音质",
'sky': "沉浸环绕声",
'jyeffect': "高清环绕声",
'jymaster': "超清母带"
}
return quality_names.get(quality, f"未知音质({quality})")
def _validate_request_params(self, required_params: Dict[str, Any]) -> Optional[Tuple[Dict[str, Any], int]]:
"""验证请求参数"""
for param_name, param_value in required_params.items():
if not param_value:
return APIResponse.error(f"参数 '{param_name}' 不能为空", 400)
return None
def _safe_get_request_data(self) -> Dict[str, Any]:
"""安全获取请求数据"""
try:
if request.method == 'GET':
return dict(request.args)
else:
# 优先使用JSON数据然后是表单数据
json_data = request.get_json(silent=True) or {}
form_data = dict(request.form)
# 合并数据JSON优先
return {**form_data, **json_data}
except Exception as e:
self.logger.error(f"获取请求数据失败: {e}")
return {}
# 创建Flask应用和服务实例
config = APIConfig()
app = Flask(__name__)
api_service = MusicAPIService(config)
@app.before_request
def before_request():
"""请求前处理"""
# 记录请求信息
api_service.logger.info(
f"{request.method} {request.path} - IP: {request.remote_addr} - "
f"User-Agent: {request.headers.get('User-Agent', 'Unknown')}"
)
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
def after_request(response: Response) -> Response:
"""请求后处理 - 设置CORS头"""
response.headers.add('Access-Control-Allow-Origin', config.cors_origins)
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,POST,OPTIONS')
response.headers.add('Access-Control-Max-Age', '3600')
# 记录响应信息
api_service.logger.info(f"响应状态: {response.status_code}")
return response
@app.route('/', methods=['GET', 'POST'])
def index():
@app.errorhandler(400)
def handle_bad_request(e):
"""处理400错误"""
return APIResponse.error("请求参数错误", 400)
@app.errorhandler(404)
def handle_not_found(e):
"""处理404错误"""
return APIResponse.error("请求的资源不存在", 404)
@app.errorhandler(500)
def handle_internal_error(e):
"""处理500错误"""
api_service.logger.error(f"服务器内部错误: {e}")
return APIResponse.error("服务器内部错误", 500)
@app.route('/')
def index() -> str:
"""首页路由"""
return render_template('index.html')
@app.route('/Song_V1', methods=['GET', 'POST'])
def Song_v1():
# 参数获取
if request.method == 'GET':
song_ids = request.args.get('ids')
url = request.args.get('url')
level = request.args.get('level')
type_ = request.args.get('type')
else:
song_ids = request.form.get('ids')
url = request.form.get('url')
level = request.form.get('level')
type_ = request.form.get('type')
# 参数校验
if not song_ids and not url:
return jsonify({'error': '必须提供 ids 或 url 参数'}), 400
if not level:
return jsonify({'error': 'level参数为空'}), 400
if not type_:
return jsonify({'error': 'type参数为空'}), 400
jsondata = song_ids if song_ids else url
cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie())
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查API"""
try:
song_id = ids(jsondata)
urlv1 = url_v1(song_id, level, cookies)
if not urlv1['data'] or urlv1['data'][0]['url'] is None:
return jsonify({"status": 400, 'msg': '信息获取不完整!'}), 400
namev1 = name_v1(urlv1['data'][0]['id'])
lyricv1 = lyric_v1(urlv1['data'][0]['id'], cookies)
song_data = urlv1['data'][0]
song_info = namev1['songs'][0] if namev1['songs'] else {}
song_url = song_data['url']
song_name = song_info.get('name', '')
song_picUrl = song_info.get('al', {}).get('picUrl', '')
song_alname = song_info.get('al', {}).get('name', '')
# 歌手名拼接
artist_names = []
for song in namev1['songs']:
ar_list = song.get('ar', [])
if ar_list:
artist_names.append('/'.join(ar['name'] for ar in ar_list))
song_arname = ', '.join(artist_names)
# 歌词
lyric = lyricv1.get('lrc', {}).get('lyric', '')
tlyric = lyricv1.get('tlyric', {}).get('lyric', None)
except Exception as e:
return jsonify({'status': 500, 'msg': f'服务异常: {str(e)}'}), 500
# 响应类型
if type_ == 'text':
data = f'歌曲名称:{song_name}<br>歌曲图片:{song_picUrl}<br>歌手:{song_arname}<br>歌曲专辑:{song_alname}<br>歌曲音质:{music_level1(song_data["level"])}<br>歌曲大小:{size(song_data["size"])}<br>音乐地址:{song_url}'
elif type_ == 'down':
data = redirect(song_url)
elif type_ == 'json':
data = {
"status": 200,
"name": song_name,
"pic": song_picUrl,
"ar_name": song_arname,
"al_name": song_alname,
"level": music_level1(song_data["level"]),
"size": size(song_data["size"]),
"url": song_url.replace("http://", "https://", 1),
"lyric": lyric,
"tlyric": tlyric
# 检查Cookie状态
cookie_status = api_service.cookie_manager.is_cookie_valid()
health_info = {
'service': 'running',
'timestamp': int(time.time()) if 'time' in sys.modules else None,
'cookie_status': 'valid' if cookie_status else 'invalid',
'downloads_dir': str(api_service.downloads_path.absolute()),
'version': '2.0.0'
}
data = jsonify(data)
else:
data = jsonify({"status": 400, 'msg': '解析失败!请检查参数是否完整!'}), 400
return data
@app.route('/Search', methods=['GET', 'POST'])
def search():
if request.method == 'GET':
keywords = request.args.get('keywords')
limit = request.args.get('limit', default=10, type=int)
else:
keywords = request.form.get('keywords')
limit = int(request.form.get('limit', 10))
if not keywords:
return jsonify({'error': '必须提供 keywords 参数'}), 400
cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie())
try:
songs = search_music(keywords, cookies, limit=limit)
return jsonify({'status': 200, 'result': songs})
return APIResponse.success(health_info, "API服务运行正常")
except Exception as e:
return jsonify({'status': 500, 'msg': f'搜索异常: {str(e)}'}), 500
api_service.logger.error(f"健康检查失败: {e}")
return APIResponse.error(f"健康检查失败: {str(e)}", 500)
@app.route('/Playlist', methods=['GET', 'POST'])
def playlist():
if request.method == 'GET':
playlist_id = request.args.get('id')
else:
playlist_id = request.form.get('id')
if not playlist_id:
return jsonify({'error': '必须提供歌单id参数'}), 400
cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie())
@app.route('/song', methods=['GET', 'POST'])
@app.route('/Song_V1', methods=['GET', 'POST']) # 向后兼容
def get_song_info():
"""获取歌曲信息API"""
try:
info = playlist_detail(playlist_id, cookies)
return jsonify({'status': 200, 'playlist': info})
# 获取请求参数
data = api_service._safe_get_request_data()
song_ids = data.get('ids') or data.get('id')
url = data.get('url')
level = data.get('level', 'lossless')
info_type = data.get('type', 'url')
# 参数验证
if not song_ids and not url:
return APIResponse.error("必须提供 'ids''id''url' 参数")
# 提取音乐ID
music_id = api_service._extract_music_id(song_ids or url)
# 验证音质参数
valid_levels = ['standard', 'exhigh', 'lossless', 'hires', 'sky', 'jyeffect', 'jymaster']
if level not in valid_levels:
return APIResponse.error(f"无效的音质参数,支持: {', '.join(valid_levels)}")
# 验证类型参数
valid_types = ['url', 'name', 'lyric', 'json']
if info_type not in valid_types:
return APIResponse.error(f"无效的类型参数,支持: {', '.join(valid_types)}")
cookies = api_service._get_cookies()
# 根据类型获取不同信息
if info_type == 'url':
result = url_v1(music_id, level, cookies)
if result and result.get('data') and len(result['data']) > 0:
song_data = result['data'][0]
response_data = {
'id': song_data.get('id'),
'url': song_data.get('url'),
'level': song_data.get('level'),
'quality_name': api_service._get_quality_display_name(song_data.get('level', level)),
'size': song_data.get('size'),
'size_formatted': api_service._format_file_size(song_data.get('size', 0)),
'type': song_data.get('type'),
'bitrate': song_data.get('br')
}
return APIResponse.success(response_data, "获取歌曲URL成功")
else:
return APIResponse.error("获取音乐URL失败可能是版权限制或音质不支持", 404)
elif info_type == 'name':
result = name_v1(music_id)
return APIResponse.success(result, "获取歌曲信息成功")
elif info_type == 'lyric':
result = lyric_v1(music_id, cookies)
return APIResponse.success(result, "获取歌词成功")
elif info_type == 'json':
# 获取完整的歌曲信息(用于前端解析)
song_info = name_v1(music_id)
url_info = url_v1(music_id, level, cookies)
lyric_info = lyric_v1(music_id, cookies)
if not song_info or 'songs' not in song_info or not song_info['songs']:
return APIResponse.error("未找到歌曲信息", 404)
song_data = song_info['songs'][0]
# 构建前端期望的响应格式
response_data = {
'id': music_id,
'name': song_data.get('name', ''),
'ar_name': ', '.join(artist['name'] for artist in song_data.get('ar', [])),
'al_name': song_data.get('al', {}).get('name', ''),
'pic': song_data.get('al', {}).get('picUrl', ''),
'level': level,
'lyric': lyric_info.get('lrc', {}).get('lyric', '') if lyric_info else '',
'tlyric': lyric_info.get('tlyric', {}).get('lyric', '') if lyric_info else ''
}
# 添加URL和大小信息
if url_info and url_info.get('data') and len(url_info['data']) > 0:
url_data = url_info['data'][0]
response_data.update({
'url': url_data.get('url', ''),
'size': api_service._format_file_size(url_data.get('size', 0)),
'level': url_data.get('level', level)
})
else:
response_data.update({
'url': '',
'size': '获取失败'
})
return APIResponse.success(response_data, "获取歌曲信息成功")
except APIException as e:
api_service.logger.error(f"API调用失败: {e}")
return APIResponse.error(f"API调用失败: {str(e)}", 500)
except Exception as e:
return jsonify({'status': 500, 'msg': f'歌单解析异常: {str(e)}'}), 500
api_service.logger.error(f"获取歌曲信息异常: {e}\n{traceback.format_exc()}")
return APIResponse.error(f"服务器错误: {str(e)}", 500)
@app.route('/Album', methods=['GET', 'POST'])
def album():
if request.method == 'GET':
album_id = request.args.get('id')
else:
album_id = request.form.get('id')
if not album_id:
return jsonify({'error': '必须提供专辑id参数'}), 400
cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie())
@app.route('/search', methods=['GET', 'POST'])
@app.route('/Search', methods=['GET', 'POST']) # 向后兼容
def search_music_api():
"""搜索音乐API"""
try:
info = album_detail(album_id, cookies)
return jsonify({'status': 200, 'album': info})
# 获取请求参数
data = api_service._safe_get_request_data()
keyword = data.get('keyword') or data.get('keywords') or data.get('q')
limit = int(data.get('limit', 30))
offset = int(data.get('offset', 0))
search_type = data.get('type', '1') # 1-歌曲, 10-专辑, 100-歌手, 1000-歌单
# 参数验证
validation_error = api_service._validate_request_params({'keyword': keyword})
if validation_error:
return validation_error
# 限制搜索数量
if limit > 100:
limit = 100
cookies = api_service._get_cookies()
result = search_music(keyword, cookies, limit)
# search_music返回的是歌曲列表需要包装成前端期望的格式
if result:
for song in result:
# 添加艺术家字符串(如果需要)
if 'artists' in song:
song['artist_string'] = song['artists']
return APIResponse.success(result, "搜索完成")
except ValueError as e:
return APIResponse.error(f"参数格式错误: {str(e)}")
except Exception as e:
return jsonify({'status': 500, 'msg': f'专辑解析异常: {str(e)}'}), 500
api_service.logger.error(f"搜索音乐异常: {e}\n{traceback.format_exc()}")
return APIResponse.error(f"搜索失败: {str(e)}", 500)
# ================= 命令行启动 =================
def start_gui(url: str = None, level: str = 'lossless'):
if url:
print(f"正在处理 URL: {url},音质:{level}")
cookies = cookie_manager.parse_cookie(cookie_manager.read_cookie())
try:
song_ids = ids(url)
urlv1 = url_v1(song_ids, level, cookies)
namev1 = name_v1(urlv1['data'][0]['id'])
lyricv1 = lyric_v1(urlv1['data'][0]['id'], cookies)
song_info = namev1['songs'][0]
song_name = song_info['name']
song_pic = song_info['al']['picUrl']
artist_names = ', '.join(artist['name'] for artist in song_info['ar'])
album_name = song_info['al']['name']
music_quality = music_level1(urlv1['data'][0]['level'])
file_size = size(urlv1['data'][0]['size'])
music_url = urlv1['data'][0]['url']
lyrics = lyricv1.get('lrc', {}).get('lyric', '')
translated_lyrics = lyricv1.get('tlyric', {}).get('lyric', None)
output_text = f"""
歌曲名称: {song_name}
歌曲图片: {song_pic}
歌手: {artist_names}
专辑名称: {album_name}
音质: {music_quality}
大小: {file_size}
音乐链接: {music_url}
歌词: {lyrics}
翻译歌词: {translated_lyrics if translated_lyrics else '没有翻译歌词'}
"""
print(output_text)
except Exception as e:
print(f"发生错误: {e}")
else:
print("没有提供 URL 参数")
def start_api():
app.run(host='0.0.0.0', port=5000, debug=False)
@app.route('/playlist', methods=['GET', 'POST'])
@app.route('/Playlist', methods=['GET', 'POST']) # 向后兼容
def get_playlist():
"""获取歌单详情API"""
try:
# 获取请求参数
data = api_service._safe_get_request_data()
playlist_id = data.get('id')
# 参数验证
validation_error = api_service._validate_request_params({'playlist_id': playlist_id})
if validation_error:
return validation_error
cookies = api_service._get_cookies()
result = playlist_detail(playlist_id, cookies)
# 适配前端期望的响应格式
response_data = {
'status': 'success',
'playlist': result
}
return APIResponse.success(response_data, "获取歌单详情成功")
except Exception as e:
api_service.logger.error(f"获取歌单异常: {e}\n{traceback.format_exc()}")
return APIResponse.error(f"获取歌单失败: {str(e)}", 500)
@app.route('/album', methods=['GET', 'POST'])
@app.route('/Album', methods=['GET', 'POST']) # 向后兼容
def get_album():
"""获取专辑详情API"""
try:
# 获取请求参数
data = api_service._safe_get_request_data()
album_id = data.get('id')
# 参数验证
validation_error = api_service._validate_request_params({'album_id': album_id})
if validation_error:
return validation_error
cookies = api_service._get_cookies()
result = album_detail(album_id, cookies)
# 适配前端期望的响应格式
response_data = {
'status': 200,
'album': result
}
return APIResponse.success(response_data, "获取专辑详情成功")
except Exception as e:
api_service.logger.error(f"获取专辑异常: {e}\n{traceback.format_exc()}")
return APIResponse.error(f"获取专辑失败: {str(e)}", 500)
@app.route('/download', methods=['GET', 'POST'])
@app.route('/Download', methods=['GET', 'POST']) # 向后兼容
def download_music_api():
"""下载音乐API"""
try:
# 获取请求参数
data = api_service._safe_get_request_data()
music_id = data.get('id')
quality = data.get('quality', 'lossless')
return_format = data.get('format', 'file') # file 或 json
# 参数验证
validation_error = api_service._validate_request_params({'music_id': music_id})
if validation_error:
return validation_error
# 验证音质参数
valid_qualities = ['standard', 'exhigh', 'lossless', 'hires', 'sky', 'jyeffect', 'jymaster']
if quality not in valid_qualities:
return APIResponse.error(f"无效的音质参数,支持: {', '.join(valid_qualities)}")
# 验证返回格式
if return_format not in ['file', 'json']:
return APIResponse.error("返回格式只支持 'file''json'")
music_id = api_service._extract_music_id(music_id)
cookies = api_service._get_cookies()
# 获取音乐基本信息
song_info = name_v1(music_id)
if not song_info or 'songs' not in song_info or not song_info['songs']:
return APIResponse.error("未找到音乐信息", 404)
# 获取音乐下载链接
url_info = url_v1(music_id, quality, cookies)
if not url_info or 'data' not in url_info or not url_info['data'] or not url_info['data'][0].get('url'):
return APIResponse.error("无法获取音乐下载链接,可能是版权限制或音质不支持", 404)
# 构建音乐信息
song_data = song_info['songs'][0]
url_data = url_info['data'][0]
music_info = {
'id': music_id,
'name': song_data['name'],
'artist_string': ', '.join(artist['name'] for artist in song_data['ar']),
'album': song_data['al']['name'],
'pic_url': song_data['al']['picUrl'],
'file_type': url_data['type'],
'file_size': url_data['size'],
'duration': song_data.get('dt', 0),
'download_url': url_data['url']
}
# 生成安全文件名
safe_name = f"{music_info['name']} [{quality}]"
safe_name = ''.join(c for c in safe_name if c not in r'<>:"/\|?*')
filename = f"{safe_name}.{music_info['file_type']}"
file_path = api_service.downloads_path / filename
# 检查文件是否已存在
if file_path.exists():
api_service.logger.info(f"文件已存在: {filename}")
else:
# 使用优化后的下载器下载
try:
download_result = api_service.downloader.download_music_file(
music_id, quality
)
if not download_result.success:
return APIResponse.error(f"下载失败: {download_result.error_message}", 500)
file_path = Path(download_result.file_path)
api_service.logger.info(f"下载完成: {filename}")
except DownloadException as e:
api_service.logger.error(f"下载异常: {e}")
return APIResponse.error(f"下载失败: {str(e)}", 500)
# 根据返回格式返回结果
if return_format == 'json':
response_data = {
'music_id': music_id,
'name': music_info['name'],
'artist': music_info['artist_string'],
'album': music_info['album'],
'quality': quality,
'quality_name': api_service._get_quality_display_name(quality),
'file_type': music_info['file_type'],
'file_size': music_info['file_size'],
'file_size_formatted': api_service._format_file_size(music_info['file_size']),
'file_path': str(file_path.absolute()),
'filename': filename,
'duration': music_info['duration']
}
return APIResponse.success(response_data, "下载完成")
else:
# 返回文件下载
if not file_path.exists():
return APIResponse.error("文件不存在", 404)
try:
response = send_file(
str(file_path),
as_attachment=True,
download_name=filename,
mimetype=f"audio/{music_info['file_type']}"
)
response.headers['X-Download-Message'] = 'Download completed successfully'
response.headers['X-Download-Filename'] = quote(filename, safe='')
return response
except Exception as e:
api_service.logger.error(f"发送文件失败: {e}")
return APIResponse.error(f"文件发送失败: {str(e)}", 500)
except Exception as e:
api_service.logger.error(f"下载音乐异常: {e}\n{traceback.format_exc()}")
return APIResponse.error(f"下载异常: {str(e)}", 500)
@app.route('/api/info', methods=['GET'])
def api_info():
"""API信息接口"""
try:
info = {
'name': '网易云音乐API服务',
'version': '2.0.0',
'description': '提供网易云音乐相关API服务',
'endpoints': {
'/health': 'GET - 健康检查',
'/song': 'GET/POST - 获取歌曲信息',
'/search': 'GET/POST - 搜索音乐',
'/playlist': 'GET/POST - 获取歌单详情',
'/album': 'GET/POST - 获取专辑详情',
'/download': 'GET/POST - 下载音乐',
'/api/info': 'GET - API信息'
},
'supported_qualities': [
'standard', 'exhigh', 'lossless',
'hires', 'sky', 'jyeffect', 'jymaster'
],
'config': {
'downloads_dir': str(api_service.downloads_path.absolute()),
'max_file_size': f"{config.max_file_size // (1024*1024)}MB",
'request_timeout': f"{config.request_timeout}s"
}
}
return APIResponse.success(info, "API信息获取成功")
except Exception as e:
api_service.logger.error(f"获取API信息异常: {e}")
return APIResponse.error(f"获取API信息失败: {str(e)}", 500)
def start_api_server():
"""启动API服务器"""
try:
import time
print("\n" + "="*60)
print("🚀 网易云音乐API服务启动中...")
print("="*60)
print(f"📡 服务地址: http://{config.host}:{config.port}")
print(f"📁 下载目录: {api_service.downloads_path.absolute()}")
print(f"📋 日志级别: {config.log_level}")
print("\n📚 API端点:")
print(f" ├─ GET /health - 健康检查")
print(f" ├─ POST /song - 获取歌曲信息")
print(f" ├─ POST /search - 搜索音乐")
print(f" ├─ POST /playlist - 获取歌单详情")
print(f" ├─ POST /album - 获取专辑详情")
print(f" ├─ POST /download - 下载音乐")
print(f" └─ GET /api/info - API信息")
print("\n🎵 支持的音质:")
print(f" standard, exhigh, lossless, hires, sky, jyeffect, jymaster")
print("="*60)
print(f"⏰ 启动时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("🌟 服务已就绪,等待请求...\n")
# 启动Flask应用
app.run(
host=config.host,
port=config.port,
debug=config.debug,
threaded=True
)
except KeyboardInterrupt:
print("\n\n👋 服务已停止")
except Exception as e:
api_service.logger.error(f"启动服务失败: {e}")
print(f"❌ 启动失败: {e}")
sys.exit(1)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="启动 API 或 GUI")
parser.add_argument('--mode', choices=['api', 'gui'], help="选择启动模式api 或 gui")
parser.add_argument('--url', help="提供 URL 参数供 GUI 模式使用")
parser.add_argument('--level', default='lossless', choices=['standard', 'exhigh', 'lossless', 'hires', 'sky', 'jyeffect', 'jymaster'], help="选择音质等级,默认是 lossless")
args = parser.parse_args()
if args.mode == 'api':
start_api()
elif args.mode == 'gui':
start_gui(args.url, args.level)
start_api_server()

File diff suppressed because it is too large Load Diff

587
music_downloader.py Normal file
View File

@ -0,0 +1,587 @@
"""音乐下载器模块
提供网易云音乐下载功能包括
- 音乐信息获取
- 文件下载到本地
- 内存下载
- 音乐标签写入
- 异步下载支持
"""
import os
import re
import asyncio
import aiohttp
import aiofiles
from io import BytesIO
from typing import Dict, List, Optional, Tuple, Any, Union
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import requests
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TRCK, APIC
from mutagen.mp4 import MP4
from music_api import NeteaseAPI, APIException
from cookie_manager import CookieManager
class AudioFormat(Enum):
"""音频格式枚举"""
MP3 = "mp3"
FLAC = "flac"
M4A = "m4a"
UNKNOWN = "unknown"
class QualityLevel(Enum):
"""音质等级枚举"""
STANDARD = "standard" # 标准
EXHIGH = "exhigh" # 极高
LOSSLESS = "lossless" # 无损
HIRES = "hires" # Hi-Res
SKY = "sky" # 沉浸环绕声
JYEFFECT = "jyeffect" # 高清环绕声
JYMASTER = "jymaster" # 超清母带
@dataclass
class MusicInfo:
"""音乐信息数据类"""
id: int
name: str
artists: str
album: str
pic_url: str
duration: int
track_number: int
download_url: str
file_type: str
file_size: int
quality: str
lyric: str = ""
tlyric: str = ""
@dataclass
class DownloadResult:
"""下载结果数据类"""
success: bool
file_path: Optional[str] = None
file_size: int = 0
error_message: str = ""
music_info: Optional[MusicInfo] = None
class DownloadException(Exception):
"""下载异常类"""
pass
class MusicDownloader:
"""音乐下载器主类"""
def __init__(self, download_dir: str = "downloads", max_concurrent: int = 3):
"""
初始化音乐下载器
Args:
download_dir: 下载目录
max_concurrent: 最大并发下载数
"""
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.max_concurrent = max_concurrent
# 初始化依赖
self.cookie_manager = CookieManager()
self.api = NeteaseAPI()
# 支持的文件格式
self.supported_formats = {
'mp3': AudioFormat.MP3,
'flac': AudioFormat.FLAC,
'm4a': AudioFormat.M4A
}
def _sanitize_filename(self, filename: str) -> str:
"""清理文件名,移除非法字符
Args:
filename: 原始文件名
Returns:
清理后的安全文件名
"""
# 移除或替换非法字符
illegal_chars = r'[<>:"/\\|?*]'
filename = re.sub(illegal_chars, '_', filename)
# 移除前后空格和点
filename = filename.strip(' .')
# 限制长度
if len(filename) > 200:
filename = filename[:200]
return filename or "unknown"
def _determine_file_extension(self, url: str, content_type: str = "") -> str:
"""根据URL和Content-Type确定文件扩展名
Args:
url: 下载URL
content_type: HTTP Content-Type头
Returns:
文件扩展名
"""
# 首先尝试从URL获取
if '.flac' in url.lower():
return '.flac'
elif '.mp3' in url.lower():
return '.mp3'
elif '.m4a' in url.lower():
return '.m4a'
# 从Content-Type获取
content_type = content_type.lower()
if 'flac' in content_type:
return '.flac'
elif 'mpeg' in content_type or 'mp3' in content_type:
return '.mp3'
elif 'mp4' in content_type or 'm4a' in content_type:
return '.m4a'
return '.mp3' # 默认
def get_music_info(self, music_id: int, quality: str = "standard") -> MusicInfo:
"""获取音乐详细信息
Args:
music_id: 音乐ID
quality: 音质等级
Returns:
音乐信息对象
Raises:
DownloadException: 获取信息失败时抛出
"""
try:
# 获取cookies
cookies = self.cookie_manager.parse_cookies()
# 获取音乐URL信息
url_result = self.api.get_song_url(music_id, quality, cookies)
if not url_result.get('data') or not url_result['data']:
raise DownloadException(f"无法获取音乐ID {music_id} 的播放链接")
song_data = url_result['data'][0]
download_url = song_data.get('url', '')
if not download_url:
raise DownloadException(f"音乐ID {music_id} 无可用的下载链接")
# 获取音乐详情
detail_result = self.api.get_song_detail(music_id)
if not detail_result.get('songs') or not detail_result['songs']:
raise DownloadException(f"无法获取音乐ID {music_id} 的详细信息")
song_detail = detail_result['songs'][0]
# 获取歌词
lyric_result = self.api.get_lyric(music_id, cookies)
lyric = lyric_result.get('lrc', {}).get('lyric', '') if lyric_result else ''
tlyric = lyric_result.get('tlyric', {}).get('lyric', '') if lyric_result else ''
# 构建艺术家字符串
artists = '/'.join(artist['name'] for artist in song_detail.get('ar', []))
# 创建MusicInfo对象
music_info = MusicInfo(
id=music_id,
name=song_detail.get('name', '未知歌曲'),
artists=artists or '未知艺术家',
album=song_detail.get('al', {}).get('name', '未知专辑'),
pic_url=song_detail.get('al', {}).get('picUrl', ''),
duration=song_detail.get('dt', 0) // 1000, # 转换为秒
track_number=song_detail.get('no', 0),
download_url=download_url,
file_type=song_data.get('type', 'mp3').lower(),
file_size=song_data.get('size', 0),
quality=quality,
lyric=lyric,
tlyric=tlyric
)
return music_info
except APIException as e:
raise DownloadException(f"API调用失败: {e}")
except Exception as e:
raise DownloadException(f"获取音乐信息时发生错误: {e}")
def download_music_file(self, music_id: int, quality: str = "standard") -> DownloadResult:
"""下载音乐文件到本地
Args:
music_id: 音乐ID
quality: 音质等级
Returns:
下载结果对象
"""
try:
# 获取音乐信息
music_info = self.get_music_info(music_id, quality)
# 生成文件名
filename = f"{music_info.artists} - {music_info.name}"
safe_filename = self._sanitize_filename(filename)
# 确定文件扩展名
file_ext = self._determine_file_extension(music_info.download_url)
file_path = self.download_dir / f"{safe_filename}{file_ext}"
# 检查文件是否已存在
if file_path.exists():
return DownloadResult(
success=True,
file_path=str(file_path),
file_size=file_path.stat().st_size,
music_info=music_info
)
# 下载文件
response = requests.get(music_info.download_url, stream=True, timeout=30)
response.raise_for_status()
# 写入文件
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# 写入音乐标签
self._write_music_tags(file_path, music_info)
return DownloadResult(
success=True,
file_path=str(file_path),
file_size=file_path.stat().st_size,
music_info=music_info
)
except DownloadException:
raise
except requests.RequestException as e:
return DownloadResult(
success=False,
error_message=f"下载请求失败: {e}"
)
except Exception as e:
return DownloadResult(
success=False,
error_message=f"下载过程中发生错误: {e}"
)
async def download_music_file_async(self, music_id: int, quality: str = "standard") -> DownloadResult:
"""异步下载音乐文件到本地
Args:
music_id: 音乐ID
quality: 音质等级
Returns:
下载结果对象
"""
try:
# 获取音乐信息(同步操作)
music_info = self.get_music_info(music_id, quality)
# 生成文件名
filename = f"{music_info.artists} - {music_info.name}"
safe_filename = self._sanitize_filename(filename)
# 确定文件扩展名
file_ext = self._determine_file_extension(music_info.download_url)
file_path = self.download_dir / f"{safe_filename}{file_ext}"
# 检查文件是否已存在
if file_path.exists():
return DownloadResult(
success=True,
file_path=str(file_path),
file_size=file_path.stat().st_size,
music_info=music_info
)
# 异步下载文件
async with aiohttp.ClientSession() as session:
async with session.get(music_info.download_url) as response:
response.raise_for_status()
async with aiofiles.open(file_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
# 写入音乐标签
self._write_music_tags(file_path, music_info)
return DownloadResult(
success=True,
file_path=str(file_path),
file_size=file_path.stat().st_size,
music_info=music_info
)
except DownloadException:
raise
except aiohttp.ClientError as e:
return DownloadResult(
success=False,
error_message=f"异步下载请求失败: {e}"
)
except Exception as e:
return DownloadResult(
success=False,
error_message=f"异步下载过程中发生错误: {e}"
)
def download_music_to_memory(self, music_id: int, quality: str = "standard") -> Tuple[bool, BytesIO, MusicInfo]:
"""下载音乐到内存
Args:
music_id: 音乐ID
quality: 音质等级
Returns:
(是否成功, 音乐数据流, 音乐信息)
Raises:
DownloadException: 下载失败时抛出
"""
try:
# 获取音乐信息
music_info = self.get_music_info(music_id, quality)
# 下载到内存
response = requests.get(music_info.download_url, timeout=30)
response.raise_for_status()
# 创建BytesIO对象
audio_data = BytesIO(response.content)
return True, audio_data, music_info
except DownloadException:
raise
except requests.RequestException as e:
raise DownloadException(f"下载到内存失败: {e}")
except Exception as e:
raise DownloadException(f"内存下载过程中发生错误: {e}")
async def download_batch_async(self, music_ids: List[int], quality: str = "standard") -> List[DownloadResult]:
"""批量异步下载音乐
Args:
music_ids: 音乐ID列表
quality: 音质等级
Returns:
下载结果列表
"""
semaphore = asyncio.Semaphore(self.max_concurrent)
async def download_with_semaphore(music_id: int) -> DownloadResult:
async with semaphore:
return await self.download_music_file_async(music_id, quality)
tasks = [download_with_semaphore(music_id) for music_id in music_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(DownloadResult(
success=False,
error_message=f"下载音乐ID {music_ids[i]} 时发生异常: {result}"
))
else:
processed_results.append(result)
return processed_results
def _write_music_tags(self, file_path: Path, music_info: MusicInfo) -> None:
"""写入音乐标签信息
Args:
file_path: 音乐文件路径
music_info: 音乐信息
"""
try:
file_ext = file_path.suffix.lower()
if file_ext == '.mp3':
self._write_mp3_tags(file_path, music_info)
elif file_ext == '.flac':
self._write_flac_tags(file_path, music_info)
elif file_ext == '.m4a':
self._write_m4a_tags(file_path, music_info)
except Exception as e:
print(f"写入音乐标签失败: {e}")
def _write_mp3_tags(self, file_path: Path, music_info: MusicInfo) -> None:
"""写入MP3标签"""
try:
audio = MP3(str(file_path), ID3=ID3)
# 添加ID3标签
audio.tags.add(TIT2(encoding=3, text=music_info.name))
audio.tags.add(TPE1(encoding=3, text=music_info.artists))
audio.tags.add(TALB(encoding=3, text=music_info.album))
if music_info.track_number > 0:
audio.tags.add(TRCK(encoding=3, text=str(music_info.track_number)))
# 下载并添加封面
if music_info.pic_url:
try:
pic_response = requests.get(music_info.pic_url, timeout=10)
pic_response.raise_for_status()
audio.tags.add(APIC(
encoding=3,
mime='image/jpeg',
type=3,
desc='Cover',
data=pic_response.content
))
except:
pass # 封面下载失败不影响主流程
audio.save()
except Exception as e:
print(f"写入MP3标签失败: {e}")
def _write_flac_tags(self, file_path: Path, music_info: MusicInfo) -> None:
"""写入FLAC标签"""
try:
audio = FLAC(str(file_path))
audio['TITLE'] = music_info.name
audio['ARTIST'] = music_info.artists
audio['ALBUM'] = music_info.album
if music_info.track_number > 0:
audio['TRACKNUMBER'] = str(music_info.track_number)
# 下载并添加封面
if music_info.pic_url:
try:
pic_response = requests.get(music_info.pic_url, timeout=10)
pic_response.raise_for_status()
from mutagen.flac import Picture
picture = Picture()
picture.type = 3 # Cover (front)
picture.mime = 'image/jpeg'
picture.desc = 'Cover'
picture.data = pic_response.content
audio.add_picture(picture)
except:
pass # 封面下载失败不影响主流程
audio.save()
except Exception as e:
print(f"写入FLAC标签失败: {e}")
def _write_m4a_tags(self, file_path: Path, music_info: MusicInfo) -> None:
"""写入M4A标签"""
try:
audio = MP4(str(file_path))
audio['\xa9nam'] = music_info.name
audio['\xa9ART'] = music_info.artists
audio['\xa9alb'] = music_info.album
if music_info.track_number > 0:
audio['trkn'] = [(music_info.track_number, 0)]
# 下载并添加封面
if music_info.pic_url:
try:
pic_response = requests.get(music_info.pic_url, timeout=10)
pic_response.raise_for_status()
audio['covr'] = [pic_response.content]
except:
pass # 封面下载失败不影响主流程
audio.save()
except Exception as e:
print(f"写入M4A标签失败: {e}")
def get_download_progress(self, music_id: int, quality: str = "standard") -> Dict[str, Any]:
"""获取下载进度信息
Args:
music_id: 音乐ID
quality: 音质等级
Returns:
包含进度信息的字典
"""
try:
music_info = self.get_music_info(music_id, quality)
filename = f"{music_info.artists} - {music_info.name}"
safe_filename = self._sanitize_filename(filename)
file_ext = self._determine_file_extension(music_info.download_url)
file_path = self.download_dir / f"{safe_filename}{file_ext}"
if file_path.exists():
current_size = file_path.stat().st_size
progress = (current_size / music_info.file_size * 100) if music_info.file_size > 0 else 0
return {
'music_id': music_id,
'filename': safe_filename + file_ext,
'total_size': music_info.file_size,
'current_size': current_size,
'progress': min(progress, 100),
'completed': current_size >= music_info.file_size
}
else:
return {
'music_id': music_id,
'filename': safe_filename + file_ext,
'total_size': music_info.file_size,
'current_size': 0,
'progress': 0,
'completed': False
}
except Exception as e:
return {
'music_id': music_id,
'error': str(e),
'progress': 0,
'completed': False
}
if __name__ == "__main__":
# 测试代码
downloader = MusicDownloader()
print("音乐下载器模块")
print("支持的功能:")
print("- 同步下载")
print("- 异步下载")
print("- 批量下载")
print("- 内存下载")
print("- 音乐标签写入")
print("- 下载进度跟踪")

View File

@ -1,9 +1,364 @@
from music_api import qr_login
print("开始网易云音乐二维码登录流程...")
cookies = qr_login()
if cookies:
print("\nCookie信息")
print(cookies)
else:
print("登录失败,请重试。")
"""网易云音乐二维码登录模块
提供网易云音乐二维码登录功能包括
- 二维码生成和显示
- 登录状态检查
- Cookie获取和保存
- 用户友好的交互界面
"""
import sys
import time
import logging
from typing import Optional, Dict, Any, Tuple
from pathlib import Path
try:
from music_api import QRLoginManager, APIException
from cookie_manager import CookieManager, CookieException
except ImportError as e:
print(f"导入模块失败: {e}")
print("请确保 music_api.py 和 cookie_manager.py 文件存在且可用")
sys.exit(1)
class QRLoginClient:
"""二维码登录客户端"""
def __init__(self, cookie_file: str = "cookie.txt"):
"""
初始化二维码登录客户端
Args:
cookie_file: Cookie保存文件路径
"""
self.cookie_manager = CookieManager(cookie_file)
self.qr_manager = QRLoginManager()
self.logger = logging.getLogger(__name__)
# 配置日志
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def check_existing_login(self) -> bool:
"""检查是否已有有效登录
Returns:
是否已登录
"""
try:
if self.cookie_manager.is_cookie_valid():
self.logger.info("检测到有效的登录Cookie")
return True
else:
self.logger.info("未检测到有效的登录Cookie")
return False
except Exception as e:
self.logger.error(f"检查登录状态失败: {e}")
return False
def interactive_login(self) -> Tuple[bool, Optional[str]]:
"""交互式二维码登录
Returns:
(登录是否成功, 错误信息)
"""
try:
print("\n=== 网易云音乐二维码登录 ===")
# 检查现有登录状态
if self.check_existing_login():
choice = input("检测到已有有效登录,是否重新登录?(y/N): ").strip().lower()
if choice not in ['y', 'yes', '']:
print("使用现有登录状态")
return True, None
print("\n开始二维码登录流程...")
# 生成二维码
print("正在生成二维码...")
qr_result = self.qr_manager.create_qr_login()
if not qr_result['success']:
error_msg = f"生成二维码失败: {qr_result.get('message', '未知错误')}"
self.logger.error(error_msg)
return False, error_msg
qr_key = qr_result['qr_key']
print(f"\n二维码已生成!")
print(f"请使用网易云音乐手机APP扫描二维码进行登录")
print(f"二维码有效期: 3分钟")
print("\n等待扫码中...")
# 轮询检查登录状态
max_attempts = 60 # 最多等待5分钟
attempt = 0
while attempt < max_attempts:
try:
# 检查登录状态
status_result = self.qr_manager.check_qr_login(qr_key)
if status_result['success']:
if status_result['status'] == 'success':
# 登录成功
cookie = status_result.get('cookie', '')
if cookie:
# 保存Cookie
success = self.save_cookie(cookie)
if success:
print("\n✅ 登录成功Cookie已保存")
return True, None
else:
error_msg = "登录成功但Cookie保存失败"
self.logger.error(error_msg)
return False, error_msg
else:
error_msg = "登录成功但未获取到Cookie"
self.logger.error(error_msg)
return False, error_msg
elif status_result['status'] == 'waiting':
# 等待扫码
if attempt % 10 == 0: # 每10次显示一次提示
print(f"等待扫码中... ({attempt + 1}/{max_attempts})")
elif status_result['status'] == 'scanned':
# 已扫码,等待确认
print("二维码已扫描,请在手机上确认登录")
elif status_result['status'] == 'expired':
# 二维码过期
error_msg = "二维码已过期,请重新尝试"
print(f"\n{error_msg}")
return False, error_msg
elif status_result['status'] == 'error':
# 登录错误
error_msg = f"登录失败: {status_result.get('message', '未知错误')}"
print(f"\n{error_msg}")
return False, error_msg
else:
self.logger.warning(f"检查登录状态失败: {status_result.get('message', '未知错误')}")
# 等待5秒后重试
time.sleep(5)
attempt += 1
except KeyboardInterrupt:
print("\n用户取消登录")
return False, "用户取消登录"
except Exception as e:
self.logger.error(f"检查登录状态时发生错误: {e}")
time.sleep(5)
attempt += 1
# 超时
error_msg = "登录超时,请重新尝试"
print(f"\n{error_msg}")
return False, error_msg
except APIException as e:
error_msg = f"API调用失败: {e}"
self.logger.error(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"登录过程中发生未知错误: {e}"
self.logger.error(error_msg)
return False, error_msg
def save_cookie(self, cookie: str) -> bool:
"""保存Cookie到文件
Args:
cookie: Cookie字符串
Returns:
是否保存成功
"""
try:
# 备份现有Cookie如果存在
if self.cookie_manager.cookie_file.exists():
try:
backup_path = self.cookie_manager.backup_cookie()
self.logger.info(f"已备份现有Cookie到: {backup_path}")
except Exception as e:
self.logger.warning(f"备份Cookie失败: {e}")
# 保存新Cookie
success = self.cookie_manager.write_cookie(cookie)
if success:
# 验证保存的Cookie
if self.cookie_manager.is_cookie_valid():
self.logger.info("Cookie保存并验证成功")
return True
else:
self.logger.warning("Cookie保存成功但验证失败")
return False
else:
self.logger.error("Cookie保存失败")
return False
except CookieException as e:
self.logger.error(f"Cookie操作失败: {e}")
return False
except Exception as e:
self.logger.error(f"保存Cookie时发生错误: {e}")
return False
def show_login_info(self) -> None:
"""显示登录信息"""
try:
info = self.cookie_manager.get_cookie_info()
print("\n=== 登录状态信息 ===")
print(f"Cookie文件: {info['file_path']}")
print(f"文件存在: {'' if info['file_exists'] else ''}")
print(f"Cookie数量: {info['cookie_count']}")
print(f"登录状态: {'有效' if info['is_valid'] else '无效'}")
if info.get('last_modified'):
print(f"最后更新: {info['last_modified']}")
if info['is_valid']:
present_cookies = info.get('important_cookies_present', [])
print(f"重要Cookie: {', '.join(present_cookies)}")
else:
missing_cookies = info.get('missing_important_cookies', [])
if missing_cookies:
print(f"缺少Cookie: {', '.join(missing_cookies)}")
except Exception as e:
print(f"获取登录信息失败: {e}")
def logout(self) -> bool:
"""登出清除Cookie
Returns:
是否登出成功
"""
try:
# 备份Cookie
if self.cookie_manager.cookie_file.exists():
try:
backup_path = self.cookie_manager.backup_cookie("logout")
print(f"Cookie已备份到: {backup_path}")
except Exception as e:
self.logger.warning(f"备份Cookie失败: {e}")
# 清除Cookie
success = self.cookie_manager.clear_cookie()
if success:
print("已成功登出")
return True
else:
print("登出失败")
return False
except Exception as e:
print(f"登出时发生错误: {e}")
return False
def main():
"""主函数"""
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
client = QRLoginClient()
# 解析命令行参数
if len(sys.argv) > 1:
command = sys.argv[1].lower()
if command == 'login':
# 执行登录
success, error = client.interactive_login()
if success:
print("\n登录完成!")
client.show_login_info()
sys.exit(0)
else:
print(f"\n登录失败: {error}")
sys.exit(1)
elif command == 'status' or command == 'info':
# 显示登录状态
client.show_login_info()
sys.exit(0)
elif command == 'logout':
# 登出
success = client.logout()
sys.exit(0 if success else 1)
elif command == 'help' or command == '-h' or command == '--help':
# 显示帮助
print("网易云音乐二维码登录工具")
print("\n用法:")
print(" python qr_login.py [命令]")
print("\n命令:")
print(" login - 执行二维码登录")
print(" status - 显示登录状态")
print(" logout - 登出清除Cookie")
print(" help - 显示此帮助信息")
print("\n如果不提供命令,将进入交互模式")
sys.exit(0)
else:
print(f"未知命令: {command}")
print("使用 'python qr_login.py help' 查看帮助")
sys.exit(1)
# 交互模式
try:
while True:
print("\n=== 网易云音乐登录工具 ===")
print("1. 二维码登录")
print("2. 查看登录状态")
print("3. 登出")
print("4. 退出")
choice = input("\n请选择操作 (1-4): ").strip()
if choice == '1':
success, error = client.interactive_login()
if success:
print("\n登录成功!")
client.show_login_info()
else:
print(f"\n登录失败: {error}")
elif choice == '2':
client.show_login_info()
elif choice == '3':
client.logout()
elif choice == '4':
print("再见!")
break
else:
print("无效选择,请重试")
except KeyboardInterrupt:
print("\n\n程序已退出")
sys.exit(0)
except Exception as e:
print(f"\n程序运行出错: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

Binary file not shown.

465
使用文档.md Normal file
View File

@ -0,0 +1,465 @@
# 网易云音乐工具箱 - 使用文档
## 📖 项目简介
网易云音乐工具箱是一个功能强大的网易云音乐解析工具支持歌曲搜索、单曲解析、歌单解析、专辑解析和音乐下载等功能。项目提供了友好的Web界面和完整的API接口支持多种音质选择包括无损、Hi-Res等高品质音频格式。
## ✨ 功能特性
### 🎵 核心功能
- **歌曲搜索**:支持关键词搜索网易云音乐库中的歌曲
- **单曲解析**:解析单首歌曲的详细信息和下载链接
- **歌单解析**:批量解析歌单中的所有歌曲
- **专辑解析**:批量解析专辑中的所有歌曲
- **音乐下载**:支持多种音质的音乐文件下载
### 🎧 音质支持
- `standard`:标准音质 (128kbps)
- `exhigh`:极高音质 (320kbps)
- `lossless`:无损音质 (FLAC)
- `hires`Hi-Res音质 (24bit/96kHz)
- `jyeffect`:高清环绕声
- `sky`:沉浸环绕声
- `jymaster`:超清母带
### 🌐 使用方式
- **Web界面**:直观的网页操作界面
- **API接口**完整的RESTful API
- **批量处理**:支持歌单和专辑的批量解析
## 🚀 快速开始
### 环境要求
- Python 3.7+
- 网易云音乐黑胶会员账号(用于获取高音质资源)
### 安装步骤
1. **克隆项目**
```bash
git clone https://github.com/Suxiaoqinx/Netease_url.git
cd Netease_url
```
2. **安装依赖**
```bash
pip install -r requirements.txt
```
3. **配置Cookie**
`cookie.txt` 文件中填入黑胶会员账号的Cookie
```
MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70;
```
> 💡 **获取Cookie方法**
> 1. 登录网易云音乐网页版
> 2. 按F12打开开发者工具
> 3. 在Network标签页找到任意请求
> 4. 复制请求头中的Cookie值
4. **启动服务**
```bash
python main.py
```
5. **访问界面**
打开浏览器访问:`http://localhost:5000`
## 🎯 使用指南
### Web界面使用
#### 1. 歌曲搜索
1. 选择「歌曲搜索」功能
2. 输入搜索关键词
3. 点击「搜索」按钮
4. 在搜索结果中点击「解析」或「下载」按钮
#### 2. 单曲解析
1. 选择「单曲解析」功能
2. 输入歌曲ID或网易云音乐链接
3. 选择音质等级
4. 点击「解析」按钮查看歌曲信息
5. 点击「下载」按钮下载音乐文件
#### 3. 歌单解析
1. 选择「歌单解析」功能
2. 输入歌单ID或歌单链接
3. 点击「解析歌单」按钮
4. 查看歌单信息和歌曲列表
5. 点击单首歌曲的「解析」或「下载」按钮
#### 4. 专辑解析
1. 选择「专辑解析」功能
2. 输入专辑ID或专辑链接
3. 点击「解析专辑」按钮
4. 查看专辑信息和歌曲列表
5. 点击单首歌曲的「解析」或「下载」按钮
#### 5. 音乐下载
1. 选择「音乐下载」功能
2. 输入音乐ID或链接
3. 选择下载音质
4. 点击「下载」按钮
5. 等待下载完成
### 链接格式支持
项目支持多种网易云音乐链接格式:
- **歌曲链接**`https://music.163.com/song?id=123456`
- **歌单链接**`https://music.163.com/playlist?id=123456`
- **专辑链接**`https://music.163.com/album?id=123456`
- **直接ID**`123456`
## 🔌 API接口文档
### 基础信息
- **服务地址**`http://localhost:5000`
- **请求方式**POST部分接口支持GET
- **响应格式**JSON
### 接口列表
#### 1. 健康检查
```http
GET /health
```
**响应示例**
```json
{
"status": 200,
"success": true,
"message": "服务运行正常",
"data": {
"service": "网易云音乐API",
"version": "1.0.0",
"uptime": "2小时3分钟"
}
}
```
#### 2. 歌曲搜索
```http
POST /search
```
**请求参数**
```json
{
"keyword": "搜索关键词",
"limit": 30
}
```
**响应示例**
```json
{
"status": 200,
"success": true,
"message": "搜索完成",
"data": [
{
"id": "123456",
"name": "歌曲名称",
"artists": "艺术家",
"album": "专辑名称",
"picUrl": "封面图片URL"
}
]
}
```
#### 3. 单曲解析
```http
POST /song
```
**请求参数**
```json
{
"url": "歌曲ID或链接",
"level": "lossless",
"type": "json"
}
```
**响应示例**
```json
{
"status": 200,
"success": true,
"message": "解析成功",
"data": {
"name": "歌曲名称",
"ar_name": "艺术家",
"al_name": "专辑名称",
"level": "lossless",
"size": "45.2MB",
"url": "下载链接",
"pic": "封面图片",
"lyric": "歌词内容"
}
}
```
#### 4. 歌单解析
```http
POST /playlist
```
**请求参数**
```json
{
"id": "歌单ID"
}
```
**响应示例**
```json
{
"status": 200,
"success": true,
"message": "歌单解析成功",
"data": {
"playlist": {
"name": "歌单名称",
"creator": "创建者",
"description": "歌单描述",
"coverImgUrl": "封面图片",
"trackCount": 50,
"tracks": [
{
"id": "123456",
"name": "歌曲名称",
"artists": "艺术家",
"album": "专辑名称",
"picUrl": "封面图片"
}
]
}
}
}
```
#### 5. 专辑解析
```http
POST /album
```
**请求参数**
```json
{
"id": "专辑ID"
}
```
**响应示例**
```json
{
"status": 200,
"success": true,
"message": "专辑解析成功",
"data": {
"album": {
"name": "专辑名称",
"artist": "艺术家",
"description": "专辑描述",
"coverImgUrl": "封面图片",
"songs": [
{
"id": "123456",
"name": "歌曲名称",
"artists": "艺术家",
"album": "专辑名称",
"picUrl": "封面图片"
}
]
}
}
}
```
#### 6. 音乐下载
```http
POST /download
```
**请求参数**
```json
{
"id": "音乐ID",
"quality": "lossless"
}
```
**响应**:返回音频文件流,支持直接下载
## 🐳 Docker部署
### 使用Docker Compose
1. **修改配置**(可选)
编辑 `docker-compose.yml` 文件修改端口:
```yaml
ports:
- "8080:5000" # 将服务映射到8080端口
```
2. **启动服务**
```bash
docker-compose up -d
```
3. **查看日志**
```bash
docker-compose logs -f
```
4. **停止服务**
```bash
docker-compose down
```
### 使用Dockerfile
```bash
# 构建镜像
docker build -t netease-music-api .
# 运行容器
docker run -d -p 5000:5000 -v $(pwd)/downloads:/app/downloads netease-music-api
```
## ⚙️ 配置说明
### 环境变量
可以通过 `.env` 文件或环境变量配置服务:
```bash
# 服务配置
HOST=0.0.0.0
PORT=5000
DEBUG=false
# 下载配置
DOWNLOADS_DIR=downloads
MAX_FILE_SIZE=524288000 # 500MB
# 日志配置
LOG_LEVEL=INFO
```
### Cookie配置
`cookie.txt` 文件中配置网易云音乐Cookie
```
MUSIC_U=你的MUSIC_U值;os=pc;appver=8.9.70;
```
> ⚠️ **重要提示**
> - 必须使用黑胶会员账号的Cookie
> - Cookie格式必须严格按照示例填写
> - 定期更新Cookie以保持有效性
## 🔧 故障排除
### 常见问题
#### 1. Cookie无效
**问题**提示Cookie无效或过期
**解决方案**
- 确认使用的是黑胶会员账号
- 重新获取Cookie并更新 `cookie.txt`
- 检查Cookie格式是否正确
#### 2. 无法下载高音质
**问题**:只能下载标准音质
**解决方案**
- 确认账号是黑胶会员
- 检查Cookie是否有效
- 确认歌曲本身支持高音质
#### 3. 服务启动失败
**问题**:运行 `python main.py` 报错
**解决方案**
- 检查Python版本需要3.7+
- 安装所有依赖:`pip install -r requirements.txt`
- 检查端口5000是否被占用
#### 4. 下载文件损坏
**问题**:下载的音频文件无法播放
**解决方案**
- 检查网络连接是否稳定
- 重新下载文件
- 尝试其他音质选项
### 日志查看
服务运行时会生成日志文件 `music_api.log`,可以查看详细的错误信息:
```bash
tail -f music_api.log
```
## 📝 开发说明
### 项目结构
```
Netease_url/
├── main.py # 主程序入口
├── music_api.py # 音乐API核心模块
├── music_downloader.py # 音乐下载模块
├── cookie_manager.py # Cookie管理模块
├── qr_login.py # 二维码登录模块
├── templates/
│ └── index.html # Web界面模板
├── downloads/ # 下载文件目录
├── requirements.txt # Python依赖
├── Dockerfile # Docker构建文件
├── docker-compose.yml # Docker Compose配置
└── README.md # 项目说明
```
### 技术栈
- **后端**Flask + Python
- **前端**Bootstrap + jQuery
- **音频处理**mutagen
- **HTTP客户端**aiohttp + requests
- **容器化**Docker
## 📄 许可证
本项目采用 MIT 许可证开源,但**禁止用于任何商业或付费项目**。
## 🤝 贡献指南
欢迎提交Issue和Pull Request
1. Fork本项目
2. 创建特性分支:`git checkout -b feature/new-feature`
3. 提交更改:`git commit -am 'Add new feature'`
4. 推送分支:`git push origin feature/new-feature`
5. 提交Pull Request
## 📞 联系方式
- **GitHub Issues**[提交问题](https://github.com/Suxiaoqinx/Netease_url/issues)
- **作者博客**[https://www.toubiec.cn](https://www.toubiec.cn)
---
**⭐ 如果这个项目对你有帮助请给个Star支持一下**