package rabbitmq import ( "context" "fmt" "net" "strconv" "time" amqp "github.com/rabbitmq/amqp091-go" "github.com/shadow1ng/fscan/common" "github.com/shadow1ng/fscan/common/i18n" "github.com/shadow1ng/fscan/plugins/base" ) // RabbitMQConnector RabbitMQ连接器实现 type RabbitMQConnector struct { host string port int } // RabbitMQConnection RabbitMQ连接结构 type RabbitMQConnection struct { conn *amqp.Connection amqpURL string username string password string info string } // NewRabbitMQConnector 创建RabbitMQ连接器 func NewRabbitMQConnector() *RabbitMQConnector { return &RabbitMQConnector{} } // Connect 建立RabbitMQ连接 func (c *RabbitMQConnector) Connect(ctx context.Context, info *common.HostInfo) (interface{}, error) { // 解析端口 port, err := strconv.Atoi(info.Ports) if err != nil { return nil, fmt.Errorf("无效的端口号: %s", info.Ports) } c.host = info.Host c.port = port // 对于服务识别,只需要检查端口连通性,不需要AMQP认证 timeout := time.Duration(common.Timeout) * time.Second address := fmt.Sprintf("%s:%s", info.Host, info.Ports) // 结果通道 type connResult struct { conn *RabbitMQConnection err error banner string } resultChan := make(chan connResult, 1) // 在协程中尝试连接 go func() { // 首先检查TCP连通性 tcpConn, err := net.DialTimeout("tcp", address, timeout) if err != nil { select { case <-ctx.Done(): case resultChan <- connResult{nil, err, ""}: } return } tcpConn.Close() // TCP连接成功,创建一个基础的连接对象用于服务识别 rabbitConn := &RabbitMQConnection{ conn: nil, // 服务识别阶段不需要真正的AMQP连接 amqpURL: fmt.Sprintf("amqp://guest:guest@%s:%s/", info.Host, info.Ports), info: "RabbitMQ Service (Detected)", } select { case <-ctx.Done(): case resultChan <- connResult{rabbitConn, nil, "RabbitMQ Service (Detected)"}: } }() // 等待连接结果 select { case result := <-resultChan: if result.err != nil { return nil, result.err } return result.conn, nil case <-ctx.Done(): return nil, ctx.Err() } } // Authenticate 进行RabbitMQ认证 func (c *RabbitMQConnector) Authenticate(ctx context.Context, conn interface{}, cred *base.Credential) error { timeout := time.Duration(common.Timeout) * time.Second amqpURL := fmt.Sprintf("amqp://%s:%s@%s:%d/", cred.Username, cred.Password, c.host, c.port) // 结果通道 type authResult struct { conn *amqp.Connection err error } resultChan := make(chan authResult, 1) // 在协程中尝试认证连接 go func() { // 配置连接 config := amqp.Config{ Dial: func(network, addr string) (net.Conn, error) { dialer := &net.Dialer{Timeout: timeout} return dialer.DialContext(ctx, network, addr) }, } // 尝试连接 authConn, err := amqp.DialConfig(amqpURL, config) select { case <-ctx.Done(): if authConn != nil { authConn.Close() } case resultChan <- authResult{authConn, err}: } }() // 等待认证结果 select { case result := <-resultChan: if result.err != nil { return fmt.Errorf(i18n.GetText("rabbitmq_auth_failed"), result.err) } // 更新连接信息 if rabbitConn, ok := conn.(*RabbitMQConnection); ok { // 关闭旧连接 if rabbitConn.conn != nil { rabbitConn.conn.Close() } // 更新为认证后的连接 rabbitConn.conn = result.conn rabbitConn.username = cred.Username rabbitConn.password = cred.Password rabbitConn.amqpURL = amqpURL } return nil case <-ctx.Done(): return ctx.Err() } } // Close 关闭RabbitMQ连接 func (c *RabbitMQConnector) Close(conn interface{}) error { if rabbitConn, ok := conn.(*RabbitMQConnection); ok { if rabbitConn.conn != nil { rabbitConn.conn.Close() } return nil } return fmt.Errorf("无效的RabbitMQ连接类型") } // GetConnectionInfo 获取连接信息 func (conn *RabbitMQConnection) GetConnectionInfo() map[string]interface{} { info := map[string]interface{}{ "protocol": "AMQP", "service": "RabbitMQ", "info": conn.info, } if conn.username != "" { info["username"] = conn.username info["authenticated"] = true } return info } // IsAlive 检查连接是否仍然有效 func (conn *RabbitMQConnection) IsAlive() bool { if conn.conn == nil { return false } return !conn.conn.IsClosed() } // GetServerInfo 获取服务器信息 func (conn *RabbitMQConnection) GetServerInfo() string { return conn.info }