fscan/Common/proxy/Manager.go
ZacharyZcR 879293e680 refactor: 重构代理系统为模块化架构
将Proxy.go的代理连接逻辑重构为完整的模块化系统,
提供更好的性能、可维护性和功能扩展。

主要变更:
- Proxy.go: 从192行单体实现重构为简洁包装层
- 新增proxy模块包含5个核心文件:Types.go、Manager.go等
- 支持多种代理类型:HTTP、HTTPS、SOCKS5、直连
- 实现连接池、缓存机制和智能资源管理
- 添加详细的连接统计和性能监控
- 提供线程安全的配置管理和动态更新
- 完全保持API向后兼容性,无需修改调用代码

新增功能:
- HTTP/HTTPS代理支持(原来仅支持SOCKS5)
- 连接跟踪和统计分析
- 错误分类和详细上下文信息
- 配置验证和URL解析
- 全局代理管理实例

测试验证:
- 编译无错误 ✓
- 基础连接功能正常 ✓
- 向后兼容性验证通过 ✓
2025-08-05 03:36:53 +08:00

337 lines
7.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package proxy
import (
"context"
"fmt"
"net"
"net/url"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/proxy"
)
// manager 代理管理器实现
type manager struct {
config *ProxyConfig
stats *ProxyStats
mu sync.RWMutex
// 连接池
dialerCache map[string]Dialer
cacheExpiry time.Time
cacheMu sync.RWMutex
}
// NewProxyManager 创建新的代理管理器
func NewProxyManager(config *ProxyConfig) ProxyManager {
if config == nil {
config = DefaultProxyConfig()
}
return &manager{
config: config,
stats: &ProxyStats{
ProxyType: config.Type.String(),
ProxyAddress: config.Address,
},
dialerCache: make(map[string]Dialer),
cacheExpiry: time.Now().Add(5 * time.Minute),
}
}
// GetDialer 获取普通拨号器
func (m *manager) GetDialer() (Dialer, error) {
m.mu.RLock()
config := m.config
m.mu.RUnlock()
switch config.Type {
case ProxyTypeNone:
return m.createDirectDialer(), nil
case ProxyTypeSOCKS5:
return m.createSOCKS5Dialer()
case ProxyTypeHTTP, ProxyTypeHTTPS:
return m.createHTTPDialer()
default:
return nil, NewProxyError(ErrTypeConfig, "不支持的代理类型", 1001, nil)
}
}
// GetTLSDialer 获取TLS拨号器
func (m *manager) GetTLSDialer() (TLSDialer, error) {
dialer, err := m.GetDialer()
if err != nil {
return nil, err
}
return &tlsDialerWrapper{
dialer: dialer,
config: m.config,
stats: m.stats,
}, nil
}
// UpdateConfig 更新配置
func (m *manager) UpdateConfig(config *ProxyConfig) error {
if config == nil {
return NewProxyError(ErrTypeConfig, "配置不能为空", 1002, nil)
}
m.mu.Lock()
defer m.mu.Unlock()
m.config = config
m.stats.ProxyType = config.Type.String()
m.stats.ProxyAddress = config.Address
// 清理缓存
m.cacheMu.Lock()
m.dialerCache = make(map[string]Dialer)
m.cacheExpiry = time.Now().Add(5 * time.Minute)
m.cacheMu.Unlock()
return nil
}
// Close 关闭管理器
func (m *manager) Close() error {
m.cacheMu.Lock()
defer m.cacheMu.Unlock()
m.dialerCache = make(map[string]Dialer)
return nil
}
// Stats 获取统计信息
func (m *manager) Stats() *ProxyStats {
m.mu.RLock()
defer m.mu.RUnlock()
// 返回副本以避免并发问题
statsCopy := *m.stats
return &statsCopy
}
// createDirectDialer 创建直连拨号器
func (m *manager) createDirectDialer() Dialer {
return &directDialer{
timeout: m.config.Timeout,
stats: m.stats,
}
}
// createSOCKS5Dialer 创建SOCKS5拨号器
func (m *manager) createSOCKS5Dialer() (Dialer, error) {
// 检查缓存
cacheKey := fmt.Sprintf("socks5_%s", m.config.Address)
m.cacheMu.RLock()
if time.Now().Before(m.cacheExpiry) {
if cached, exists := m.dialerCache[cacheKey]; exists {
m.cacheMu.RUnlock()
return cached, nil
}
}
m.cacheMu.RUnlock()
// 解析代理地址
proxyURL := fmt.Sprintf("socks5://%s", m.config.Address)
if m.config.Username != "" {
proxyURL = fmt.Sprintf("socks5://%s:%s@%s",
m.config.Username, m.config.Password, m.config.Address)
}
u, err := url.Parse(proxyURL)
if err != nil {
return nil, NewProxyError(ErrTypeConfig, "SOCKS5代理地址解析失败", 2001, err)
}
// 创建基础拨号器
baseDial := &net.Dialer{
Timeout: m.config.Timeout,
KeepAlive: m.config.KeepAlive,
}
// 创建SOCKS5拨号器
var auth *proxy.Auth
if u.User != nil {
auth = &proxy.Auth{
User: u.User.Username(),
}
if password, hasPassword := u.User.Password(); hasPassword {
auth.Password = password
}
}
socksDialer, err := proxy.SOCKS5("tcp", u.Host, auth, baseDial)
if err != nil {
return nil, NewProxyError(ErrTypeConnection, "SOCKS5拨号器创建失败", 2002, err)
}
dialer := &socks5Dialer{
dialer: socksDialer,
config: m.config,
stats: m.stats,
}
// 更新缓存
m.cacheMu.Lock()
m.dialerCache[cacheKey] = dialer
m.cacheExpiry = time.Now().Add(5 * time.Minute)
m.cacheMu.Unlock()
return dialer, nil
}
// createHTTPDialer 创建HTTP代理拨号器
func (m *manager) createHTTPDialer() (Dialer, error) {
// 检查缓存
cacheKey := fmt.Sprintf("http_%s", m.config.Address)
m.cacheMu.RLock()
if time.Now().Before(m.cacheExpiry) {
if cached, exists := m.dialerCache[cacheKey]; exists {
m.cacheMu.RUnlock()
return cached, nil
}
}
m.cacheMu.RUnlock()
dialer := &httpDialer{
config: m.config,
stats: m.stats,
baseDial: &net.Dialer{
Timeout: m.config.Timeout,
KeepAlive: m.config.KeepAlive,
},
}
// 更新缓存
m.cacheMu.Lock()
m.dialerCache[cacheKey] = dialer
m.cacheExpiry = time.Now().Add(5 * time.Minute)
m.cacheMu.Unlock()
return dialer, nil
}
// directDialer 直连拨号器
type directDialer struct {
timeout time.Duration
stats *ProxyStats
}
func (d *directDialer) Dial(network, address string) (net.Conn, error) {
return d.DialContext(context.Background(), network, address)
}
func (d *directDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
start := time.Now()
atomic.AddInt64(&d.stats.TotalConnections, 1)
dialer := &net.Dialer{
Timeout: d.timeout,
}
conn, err := dialer.DialContext(ctx, network, address)
duration := time.Since(start)
d.stats.LastConnectTime = start
if err != nil {
atomic.AddInt64(&d.stats.FailedConnections, 1)
d.stats.LastError = err.Error()
return nil, NewProxyError(ErrTypeConnection, "直连失败", 3001, err)
}
atomic.AddInt64(&d.stats.ActiveConnections, 1)
d.updateAverageConnectTime(duration)
return &trackedConn{
Conn: conn,
stats: d.stats,
}, nil
}
// socks5Dialer SOCKS5拨号器
type socks5Dialer struct {
dialer proxy.Dialer
config *ProxyConfig
stats *ProxyStats
}
func (s *socks5Dialer) Dial(network, address string) (net.Conn, error) {
return s.DialContext(context.Background(), network, address)
}
func (s *socks5Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
start := time.Now()
atomic.AddInt64(&s.stats.TotalConnections, 1)
// 创建一个带超时的上下文
dialCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()
// 使用goroutine处理拨号以支持取消
connChan := make(chan struct {
conn net.Conn
err error
}, 1)
go func() {
conn, err := s.dialer.Dial(network, address)
select {
case <-dialCtx.Done():
if conn != nil {
conn.Close()
}
case connChan <- struct {
conn net.Conn
err error
}{conn, err}:
}
}()
select {
case <-dialCtx.Done():
atomic.AddInt64(&s.stats.FailedConnections, 1)
s.stats.LastError = dialCtx.Err().Error()
return nil, NewProxyError(ErrTypeTimeout, "SOCKS5连接超时", 3002, dialCtx.Err())
case result := <-connChan:
duration := time.Since(start)
s.stats.LastConnectTime = start
if result.err != nil {
atomic.AddInt64(&s.stats.FailedConnections, 1)
s.stats.LastError = result.err.Error()
return nil, NewProxyError(ErrTypeConnection, "SOCKS5连接失败", 3003, result.err)
}
atomic.AddInt64(&s.stats.ActiveConnections, 1)
s.updateAverageConnectTime(duration)
return &trackedConn{
Conn: result.conn,
stats: s.stats,
}, nil
}
}
// updateAverageConnectTime 更新平均连接时间
func (d *directDialer) updateAverageConnectTime(duration time.Duration) {
// 简单的移动平均
if d.stats.AverageConnectTime == 0 {
d.stats.AverageConnectTime = duration
} else {
d.stats.AverageConnectTime = (d.stats.AverageConnectTime + duration) / 2
}
}
func (s *socks5Dialer) updateAverageConnectTime(duration time.Duration) {
// 简单的移动平均
if s.stats.AverageConnectTime == 0 {
s.stats.AverageConnectTime = duration
} else {
s.stats.AverageConnectTime = (s.stats.AverageConnectTime + duration) / 2
}
}