package proxy import ( "context" "fmt" "net" "net/url" "sync" "sync/atomic" "time" "golang.org/x/net/proxy" ) // manager 代理管理器实现 type manager struct { config *ProxyConfig stats *ProxyStats mu sync.RWMutex // 连接池 dialerCache map[string]Dialer cacheExpiry time.Time cacheMu sync.RWMutex } // NewProxyManager 创建新的代理管理器 func NewProxyManager(config *ProxyConfig) ProxyManager { if config == nil { config = DefaultProxyConfig() } return &manager{ config: config, stats: &ProxyStats{ ProxyType: config.Type.String(), ProxyAddress: config.Address, }, dialerCache: make(map[string]Dialer), cacheExpiry: time.Now().Add(5 * time.Minute), } } // GetDialer 获取普通拨号器 func (m *manager) GetDialer() (Dialer, error) { m.mu.RLock() config := m.config m.mu.RUnlock() switch config.Type { case ProxyTypeNone: return m.createDirectDialer(), nil case ProxyTypeSOCKS5: return m.createSOCKS5Dialer() case ProxyTypeHTTP, ProxyTypeHTTPS: return m.createHTTPDialer() default: return nil, NewProxyError(ErrTypeConfig, "不支持的代理类型", 1001, nil) } } // GetTLSDialer 获取TLS拨号器 func (m *manager) GetTLSDialer() (TLSDialer, error) { dialer, err := m.GetDialer() if err != nil { return nil, err } return &tlsDialerWrapper{ dialer: dialer, config: m.config, stats: m.stats, }, nil } // UpdateConfig 更新配置 func (m *manager) UpdateConfig(config *ProxyConfig) error { if config == nil { return NewProxyError(ErrTypeConfig, "配置不能为空", 1002, nil) } m.mu.Lock() defer m.mu.Unlock() m.config = config m.stats.ProxyType = config.Type.String() m.stats.ProxyAddress = config.Address // 清理缓存 m.cacheMu.Lock() m.dialerCache = make(map[string]Dialer) m.cacheExpiry = time.Now().Add(5 * time.Minute) m.cacheMu.Unlock() return nil } // Close 关闭管理器 func (m *manager) Close() error { m.cacheMu.Lock() defer m.cacheMu.Unlock() m.dialerCache = make(map[string]Dialer) return nil } // Stats 获取统计信息 func (m *manager) Stats() *ProxyStats { m.mu.RLock() defer m.mu.RUnlock() // 返回副本以避免并发问题 statsCopy := *m.stats return &statsCopy } // createDirectDialer 创建直连拨号器 func (m *manager) createDirectDialer() Dialer { return &directDialer{ timeout: m.config.Timeout, stats: m.stats, } } // createSOCKS5Dialer 创建SOCKS5拨号器 func (m *manager) createSOCKS5Dialer() (Dialer, error) { // 检查缓存 cacheKey := fmt.Sprintf("socks5_%s", m.config.Address) m.cacheMu.RLock() if time.Now().Before(m.cacheExpiry) { if cached, exists := m.dialerCache[cacheKey]; exists { m.cacheMu.RUnlock() return cached, nil } } m.cacheMu.RUnlock() // 解析代理地址 proxyURL := fmt.Sprintf("socks5://%s", m.config.Address) if m.config.Username != "" { proxyURL = fmt.Sprintf("socks5://%s:%s@%s", m.config.Username, m.config.Password, m.config.Address) } u, err := url.Parse(proxyURL) if err != nil { return nil, NewProxyError(ErrTypeConfig, "SOCKS5代理地址解析失败", 2001, err) } // 创建基础拨号器 baseDial := &net.Dialer{ Timeout: m.config.Timeout, KeepAlive: m.config.KeepAlive, } // 创建SOCKS5拨号器 var auth *proxy.Auth if u.User != nil { auth = &proxy.Auth{ User: u.User.Username(), } if password, hasPassword := u.User.Password(); hasPassword { auth.Password = password } } socksDialer, err := proxy.SOCKS5("tcp", u.Host, auth, baseDial) if err != nil { return nil, NewProxyError(ErrTypeConnection, "SOCKS5拨号器创建失败", 2002, err) } dialer := &socks5Dialer{ dialer: socksDialer, config: m.config, stats: m.stats, } // 更新缓存 m.cacheMu.Lock() m.dialerCache[cacheKey] = dialer m.cacheExpiry = time.Now().Add(5 * time.Minute) m.cacheMu.Unlock() return dialer, nil } // createHTTPDialer 创建HTTP代理拨号器 func (m *manager) createHTTPDialer() (Dialer, error) { // 检查缓存 cacheKey := fmt.Sprintf("http_%s", m.config.Address) m.cacheMu.RLock() if time.Now().Before(m.cacheExpiry) { if cached, exists := m.dialerCache[cacheKey]; exists { m.cacheMu.RUnlock() return cached, nil } } m.cacheMu.RUnlock() dialer := &httpDialer{ config: m.config, stats: m.stats, baseDial: &net.Dialer{ Timeout: m.config.Timeout, KeepAlive: m.config.KeepAlive, }, } // 更新缓存 m.cacheMu.Lock() m.dialerCache[cacheKey] = dialer m.cacheExpiry = time.Now().Add(5 * time.Minute) m.cacheMu.Unlock() return dialer, nil } // directDialer 直连拨号器 type directDialer struct { timeout time.Duration stats *ProxyStats } func (d *directDialer) Dial(network, address string) (net.Conn, error) { return d.DialContext(context.Background(), network, address) } func (d *directDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { start := time.Now() atomic.AddInt64(&d.stats.TotalConnections, 1) dialer := &net.Dialer{ Timeout: d.timeout, } conn, err := dialer.DialContext(ctx, network, address) duration := time.Since(start) d.stats.LastConnectTime = start if err != nil { atomic.AddInt64(&d.stats.FailedConnections, 1) d.stats.LastError = err.Error() return nil, NewProxyError(ErrTypeConnection, "直连失败", 3001, err) } atomic.AddInt64(&d.stats.ActiveConnections, 1) d.updateAverageConnectTime(duration) return &trackedConn{ Conn: conn, stats: d.stats, }, nil } // socks5Dialer SOCKS5拨号器 type socks5Dialer struct { dialer proxy.Dialer config *ProxyConfig stats *ProxyStats } func (s *socks5Dialer) Dial(network, address string) (net.Conn, error) { return s.DialContext(context.Background(), network, address) } func (s *socks5Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { start := time.Now() atomic.AddInt64(&s.stats.TotalConnections, 1) // 创建一个带超时的上下文 dialCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() // 使用goroutine处理拨号,以支持取消 connChan := make(chan struct { conn net.Conn err error }, 1) go func() { conn, err := s.dialer.Dial(network, address) select { case <-dialCtx.Done(): if conn != nil { conn.Close() } case connChan <- struct { conn net.Conn err error }{conn, err}: } }() select { case <-dialCtx.Done(): atomic.AddInt64(&s.stats.FailedConnections, 1) s.stats.LastError = dialCtx.Err().Error() return nil, NewProxyError(ErrTypeTimeout, "SOCKS5连接超时", 3002, dialCtx.Err()) case result := <-connChan: duration := time.Since(start) s.stats.LastConnectTime = start if result.err != nil { atomic.AddInt64(&s.stats.FailedConnections, 1) s.stats.LastError = result.err.Error() return nil, NewProxyError(ErrTypeConnection, "SOCKS5连接失败", 3003, result.err) } atomic.AddInt64(&s.stats.ActiveConnections, 1) s.updateAverageConnectTime(duration) return &trackedConn{ Conn: result.conn, stats: s.stats, }, nil } } // updateAverageConnectTime 更新平均连接时间 func (d *directDialer) updateAverageConnectTime(duration time.Duration) { // 简单的移动平均 if d.stats.AverageConnectTime == 0 { d.stats.AverageConnectTime = duration } else { d.stats.AverageConnectTime = (d.stats.AverageConnectTime + duration) / 2 } } func (s *socks5Dialer) updateAverageConnectTime(duration time.Duration) { // 简单的移动平均 if s.stats.AverageConnectTime == 0 { s.stats.AverageConnectTime = duration } else { s.stats.AverageConnectTime = (s.stats.AverageConnectTime + duration) / 2 } }