package common import ( "fmt" "sync" "sync/atomic" "github.com/shadow1ng/fscan/common/i18n" ) /* ConcurrencyMonitor.go - 并发监控器 监控两个层级的并发: 1. 主扫描器线程数 (-t 参数控制) 2. 插件内连接线程数 (-mt 参数控制) */ // ConcurrencyMonitor 并发监控器 type ConcurrencyMonitor struct { // 主扫描器层级 activePluginTasks int64 // 当前活跃的插件任务数 totalPluginTasks int64 // 总插件任务数 // 插件内连接层级 (每个插件的连接线程数) pluginConnections sync.Map // map[string]*PluginConnectionInfo mu sync.RWMutex } // PluginConnectionInfo 单个插件的连接信息 type PluginConnectionInfo struct { PluginName string // 插件名称 Target string // 目标地址 ActiveConnections int64 // 当前活跃连接数 TotalConnections int64 // 总连接数 } var ( globalConcurrencyMonitor *ConcurrencyMonitor concurrencyMutex sync.Once ) // GetConcurrencyMonitor 获取全局并发监控器 func GetConcurrencyMonitor() *ConcurrencyMonitor { concurrencyMutex.Do(func() { globalConcurrencyMonitor = &ConcurrencyMonitor{ activePluginTasks: 0, totalPluginTasks: 0, } }) return globalConcurrencyMonitor } // ============================================================================= // 主扫描器层级监控 // ============================================================================= // StartPluginTask 开始插件任务 func (m *ConcurrencyMonitor) StartPluginTask() { atomic.AddInt64(&m.activePluginTasks, 1) atomic.AddInt64(&m.totalPluginTasks, 1) } // FinishPluginTask 完成插件任务 func (m *ConcurrencyMonitor) FinishPluginTask() { atomic.AddInt64(&m.activePluginTasks, -1) } // GetPluginTaskStats 获取插件任务统计 func (m *ConcurrencyMonitor) GetPluginTaskStats() (active int64, total int64) { return atomic.LoadInt64(&m.activePluginTasks), atomic.LoadInt64(&m.totalPluginTasks) } // ============================================================================= // 插件内连接层级监控 // ============================================================================= // StartConnection 开始连接 func (m *ConcurrencyMonitor) StartConnection(pluginName, target string) { key := fmt.Sprintf("%s@%s", pluginName, target) value, _ := m.pluginConnections.LoadOrStore(key, &PluginConnectionInfo{ PluginName: pluginName, Target: target, }) info := value.(*PluginConnectionInfo) atomic.AddInt64(&info.ActiveConnections, 1) atomic.AddInt64(&info.TotalConnections, 1) } // FinishConnection 完成连接 func (m *ConcurrencyMonitor) FinishConnection(pluginName, target string) { key := fmt.Sprintf("%s@%s", pluginName, target) if value, ok := m.pluginConnections.Load(key); ok { info := value.(*PluginConnectionInfo) atomic.AddInt64(&info.ActiveConnections, -1) } } // GetConnectionStats 获取所有插件连接统计 func (m *ConcurrencyMonitor) GetConnectionStats() map[string]*PluginConnectionInfo { stats := make(map[string]*PluginConnectionInfo) m.pluginConnections.Range(func(key, value interface{}) bool { keyStr := key.(string) info := value.(*PluginConnectionInfo) // 只返回当前活跃的连接 if atomic.LoadInt64(&info.ActiveConnections) > 0 { stats[keyStr] = &PluginConnectionInfo{ PluginName: info.PluginName, Target: info.Target, ActiveConnections: atomic.LoadInt64(&info.ActiveConnections), TotalConnections: atomic.LoadInt64(&info.TotalConnections), } } return true }) return stats } // GetTotalActiveConnections 获取总活跃连接数 func (m *ConcurrencyMonitor) GetTotalActiveConnections() int64 { var total int64 m.pluginConnections.Range(func(key, value interface{}) bool { info := value.(*PluginConnectionInfo) total += atomic.LoadInt64(&info.ActiveConnections) return true }) return total } // Reset 重置监控器 func (m *ConcurrencyMonitor) Reset() { atomic.StoreInt64(&m.activePluginTasks, 0) atomic.StoreInt64(&m.totalPluginTasks, 0) m.pluginConnections.Range(func(key, value interface{}) bool { m.pluginConnections.Delete(key) return true }) } // GetConcurrencyStatus 获取并发状态字符串 func (m *ConcurrencyMonitor) GetConcurrencyStatus() string { activePlugins, _ := m.GetPluginTaskStats() totalConnections := m.GetTotalActiveConnections() if activePlugins == 0 && totalConnections == 0 { return "" } if totalConnections == 0 { return fmt.Sprintf("%s:%d", i18n.GetText("concurrency_plugin"), activePlugins) } return fmt.Sprintf("%s:%d %s:%d", i18n.GetText("concurrency_plugin"), activePlugins, i18n.GetText("concurrency_connection"), totalConnections) } // GetDetailedStatus 获取详细的并发状态 func (m *ConcurrencyMonitor) GetDetailedStatus() string { activePlugins, _ := m.GetPluginTaskStats() connectionStats := m.GetConnectionStats() if activePlugins == 0 && len(connectionStats) == 0 { return i18n.GetText("concurrency_no_active_tasks") } status := fmt.Sprintf("%s: %d", i18n.GetText("concurrency_plugin_tasks"), activePlugins) if len(connectionStats) > 0 { status += " | " + i18n.GetText("concurrency_connection_details") + ": " first := true for _, info := range connectionStats { if !first { status += ", " } status += fmt.Sprintf("%s@%s:%d", info.PluginName, info.Target, info.ActiveConnections) first = false } } return status }