Netease_url/cookie_manager.py
2025-08-25 01:23:41 +08:00

470 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Cookie管理器模块
提供网易云音乐Cookie管理功能包括
- Cookie文件读取和写入
- Cookie格式验证和解析
- Cookie有效性检查
- 自动过期处理
"""
import os
import json
import time
from typing import Dict, Optional, List, Tuple, Any
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
@dataclass
class CookieInfo:
"""Cookie信息数据类"""
name: str
value: str
domain: str = ""
path: str = "/"
expires: Optional[int] = None
secure: bool = False
http_only: bool = False
class CookieException(Exception):
"""Cookie相关异常类"""
pass
class CookieManager:
"""Cookie管理器主类"""
def __init__(self, cookie_file: str = "cookie.txt"):
"""
初始化Cookie管理器
Args:
cookie_file: Cookie文件路径
"""
self.cookie_file = Path(cookie_file)
self.logger = logging.getLogger(__name__)
# 网易云音乐相关的重要Cookie字段
self.important_cookies = {
'MUSIC_U', # 用户标识
'MUSIC_A', # 用户认证
'__csrf', # CSRF令牌
'NMTID', # 设备标识
'WEVNSM', # 会话管理
'WNMCID', # 客户端标识
}
# 确保cookie文件存在
self._ensure_cookie_file_exists()
def _ensure_cookie_file_exists(self) -> None:
"""确保Cookie文件存在"""
if not self.cookie_file.exists():
self.cookie_file.touch()
self.logger.info(f"创建Cookie文件: {self.cookie_file}")
def read_cookie(self) -> str:
"""读取Cookie文件内容
Returns:
Cookie字符串内容
Raises:
CookieException: 读取失败时抛出
"""
try:
if not self.cookie_file.exists():
self.logger.warning(f"Cookie文件不存在: {self.cookie_file}")
return ""
content = self.cookie_file.read_text(encoding='utf-8').strip()
if not content:
self.logger.warning("Cookie文件为空")
return ""
self.logger.debug(f"成功读取Cookie文件长度: {len(content)}")
return content
except UnicodeDecodeError as e:
raise CookieException(f"Cookie文件编码错误: {e}")
except PermissionError as e:
raise CookieException(f"没有权限读取Cookie文件: {e}")
except Exception as e:
raise CookieException(f"读取Cookie文件失败: {e}")
def write_cookie(self, cookie_content: str) -> bool:
"""写入Cookie到文件
Args:
cookie_content: Cookie内容字符串
Returns:
是否写入成功
Raises:
CookieException: 写入失败时抛出
"""
try:
if not cookie_content or not cookie_content.strip():
raise CookieException("Cookie内容不能为空")
# 验证Cookie格式
if not self.validate_cookie_format(cookie_content):
raise CookieException("Cookie格式无效")
# 写入文件
self.cookie_file.write_text(cookie_content.strip(), encoding='utf-8')
self.logger.info(f"成功写入Cookie到文件: {self.cookie_file}")
return True
except PermissionError as e:
raise CookieException(f"没有权限写入Cookie文件: {e}")
except Exception as e:
raise CookieException(f"写入Cookie文件失败: {e}")
def parse_cookies(self) -> Dict[str, str]:
"""解析Cookie字符串为字典
Returns:
Cookie字典
Raises:
CookieException: 解析失败时抛出
"""
try:
cookie_content = self.read_cookie()
if not cookie_content:
return {}
return self.parse_cookie_string(cookie_content)
except Exception as e:
raise CookieException(f"解析Cookie失败: {e}")
def parse_cookie_string(self, cookie_string: str) -> Dict[str, str]:
"""解析Cookie字符串
Args:
cookie_string: Cookie字符串
Returns:
Cookie字典
"""
if not cookie_string or not cookie_string.strip():
return {}
cookies = {}
try:
# 处理多种Cookie格式
cookie_string = cookie_string.strip()
# 分割Cookie项
cookie_pairs = []
if ';' in cookie_string:
cookie_pairs = cookie_string.split(';')
elif '\n' in cookie_string:
cookie_pairs = cookie_string.split('\n')
else:
cookie_pairs = [cookie_string]
for pair in cookie_pairs:
pair = pair.strip()
if not pair or '=' not in pair:
continue
# 分割键值对
key, value = pair.split('=', 1)
key = key.strip()
value = value.strip()
if key and value:
cookies[key] = value
self.logger.debug(f"解析得到 {len(cookies)} 个Cookie项")
return cookies
except Exception as e:
self.logger.error(f"解析Cookie字符串失败: {e}")
return {}
def validate_cookie_format(self, cookie_string: str) -> bool:
"""验证Cookie格式是否有效
Args:
cookie_string: Cookie字符串
Returns:
是否格式有效
"""
if not cookie_string or not cookie_string.strip():
return False
try:
# 尝试解析Cookie
cookies = self.parse_cookie_string(cookie_string)
# 检查是否至少包含一个有效的Cookie
if not cookies:
return False
# 检查Cookie名称是否合法
for name, value in cookies.items():
if not name or not isinstance(name, str):
return False
if not isinstance(value, str):
return False
# 检查是否包含非法字符
if any(char in name for char in [' ', '\t', '\n', '\r', ';', ',']):
return False
return True
except Exception:
return False
def is_cookie_valid(self) -> bool:
"""检查Cookie是否有效
Returns:
Cookie是否有效
"""
try:
cookies = self.parse_cookies()
if not cookies:
self.logger.warning("Cookie为空")
return False
# 检查重要Cookie是否存在
missing_cookies = self.important_cookies - set(cookies.keys())
if missing_cookies:
self.logger.warning(f"缺少重要Cookie: {missing_cookies}")
return False
# 检查MUSIC_U是否有效基本验证
music_u = cookies.get('MUSIC_U', '')
if not music_u or len(music_u) < 10:
self.logger.warning("MUSIC_U Cookie无效")
return False
self.logger.debug("Cookie验证通过")
return True
except Exception as e:
self.logger.error(f"Cookie验证失败: {e}")
return False
def get_cookie_info(self) -> Dict[str, Any]:
"""获取Cookie详细信息
Returns:
包含Cookie信息的字典
"""
try:
cookies = self.parse_cookies()
info = {
'file_path': str(self.cookie_file),
'file_exists': self.cookie_file.exists(),
'file_size': self.cookie_file.stat().st_size if self.cookie_file.exists() else 0,
'cookie_count': len(cookies),
'is_valid': self.is_cookie_valid(),
'important_cookies_present': list(self.important_cookies & set(cookies.keys())),
'missing_important_cookies': list(self.important_cookies - set(cookies.keys())),
'all_cookie_names': list(cookies.keys())
}
# 添加文件修改时间
if self.cookie_file.exists():
mtime = self.cookie_file.stat().st_mtime
info['last_modified'] = datetime.fromtimestamp(mtime).isoformat()
return info
except Exception as e:
return {
'error': str(e),
'file_path': str(self.cookie_file),
'file_exists': False,
'is_valid': False
}
def backup_cookie(self, backup_suffix: str = None) -> str:
"""备份Cookie文件
Args:
backup_suffix: 备份文件后缀,默认使用时间戳
Returns:
备份文件路径
Raises:
CookieException: 备份失败时抛出
"""
try:
if not self.cookie_file.exists():
raise CookieException("Cookie文件不存在无法备份")
if backup_suffix is None:
backup_suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.cookie_file.with_suffix(f".{backup_suffix}.bak")
# 复制文件内容
content = self.cookie_file.read_text(encoding='utf-8')
backup_path.write_text(content, encoding='utf-8')
self.logger.info(f"Cookie备份成功: {backup_path}")
return str(backup_path)
except Exception as e:
raise CookieException(f"备份Cookie文件失败: {e}")
def restore_cookie(self, backup_path: str) -> bool:
"""从备份恢复Cookie
Args:
backup_path: 备份文件路径
Returns:
是否恢复成功
Raises:
CookieException: 恢复失败时抛出
"""
try:
backup_file = Path(backup_path)
if not backup_file.exists():
raise CookieException(f"备份文件不存在: {backup_path}")
# 读取备份内容
backup_content = backup_file.read_text(encoding='utf-8')
# 验证备份内容
if not self.validate_cookie_format(backup_content):
raise CookieException("备份文件中的Cookie格式无效")
# 写入当前Cookie文件
self.write_cookie(backup_content)
self.logger.info(f"从备份恢复Cookie成功: {backup_path}")
return True
except Exception as e:
raise CookieException(f"恢复Cookie失败: {e}")
def clear_cookie(self) -> bool:
"""清空Cookie文件
Returns:
是否清空成功
"""
try:
if self.cookie_file.exists():
self.cookie_file.write_text("", encoding='utf-8')
self.logger.info("Cookie文件已清空")
return True
except Exception as e:
self.logger.error(f"清空Cookie文件失败: {e}")
return False
def update_cookie(self, new_cookies: Dict[str, str]) -> bool:
"""更新Cookie
Args:
new_cookies: 新的Cookie字典
Returns:
是否更新成功
"""
try:
if not new_cookies:
raise CookieException("新Cookie不能为空")
# 读取现有Cookie
existing_cookies = self.parse_cookies()
# 合并Cookie
existing_cookies.update(new_cookies)
# 转换为Cookie字符串
cookie_string = '; '.join(f"{k}={v}" for k, v in existing_cookies.items())
# 写入文件
return self.write_cookie(cookie_string)
except Exception as e:
self.logger.error(f"更新Cookie失败: {e}")
return False
def get_cookie_for_request(self) -> Dict[str, str]:
"""获取用于HTTP请求的Cookie字典
Returns:
适用于requests库的Cookie字典
"""
try:
cookies = self.parse_cookies()
# 过滤掉空值
filtered_cookies = {k: v for k, v in cookies.items() if k and v}
return filtered_cookies
except Exception as e:
self.logger.error(f"获取请求Cookie失败: {e}")
return {}
def format_cookie_string(self, cookies: Dict[str, str]) -> str:
"""将Cookie字典格式化为字符串
Args:
cookies: Cookie字典
Returns:
Cookie字符串
"""
if not cookies:
return ""
return '; '.join(f"{k}={v}" for k, v in cookies.items() if k and v)
def __str__(self) -> str:
"""字符串表示"""
info = self.get_cookie_info()
return f"CookieManager(file={info['file_path']}, valid={info['is_valid']}, count={info['cookie_count']})"
def __repr__(self) -> str:
"""详细字符串表示"""
return self.__str__()
if __name__ == "__main__":
# 测试代码
import sys
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
manager = CookieManager()
print("Cookie管理器模块")
print("支持的功能:")
print("- Cookie文件读写")
print("- Cookie格式验证")
print("- Cookie有效性检查")
print("- Cookie备份和恢复")
print("- Cookie信息查看")
# 显示当前Cookie信息
info = manager.get_cookie_info()
print(f"\n当前Cookie状态: {manager}")
print(f"详细信息: {info}")