mirror of
https://github.com/shadow1ng/fscan.git
synced 2025-09-14 14:06:44 +08:00
195 lines
4.4 KiB
Go
195 lines
4.4 KiB
Go
package rabbitmq
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"net"
|
||
"strconv"
|
||
"time"
|
||
|
||
amqp "github.com/rabbitmq/amqp091-go"
|
||
"github.com/shadow1ng/fscan/common"
|
||
"github.com/shadow1ng/fscan/common/i18n"
|
||
"github.com/shadow1ng/fscan/plugins/base"
|
||
)
|
||
|
||
// RabbitMQConnector RabbitMQ连接器实现
|
||
type RabbitMQConnector struct {
|
||
host string
|
||
port int
|
||
}
|
||
|
||
// RabbitMQConnection RabbitMQ连接结构
|
||
type RabbitMQConnection struct {
|
||
conn *amqp.Connection
|
||
amqpURL string
|
||
username string
|
||
password string
|
||
info string
|
||
}
|
||
|
||
// NewRabbitMQConnector 创建RabbitMQ连接器
|
||
func NewRabbitMQConnector() *RabbitMQConnector {
|
||
return &RabbitMQConnector{}
|
||
}
|
||
|
||
// Connect 建立RabbitMQ连接
|
||
func (c *RabbitMQConnector) Connect(ctx context.Context, info *common.HostInfo) (interface{}, error) {
|
||
// 解析端口
|
||
port, err := strconv.Atoi(info.Ports)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("无效的端口号: %s", info.Ports)
|
||
}
|
||
|
||
c.host = info.Host
|
||
c.port = port
|
||
|
||
// 对于服务识别,只需要检查端口连通性,不需要AMQP认证
|
||
timeout := time.Duration(common.Timeout) * time.Second
|
||
address := fmt.Sprintf("%s:%s", info.Host, info.Ports)
|
||
|
||
// 结果通道
|
||
type connResult struct {
|
||
conn *RabbitMQConnection
|
||
err error
|
||
banner string
|
||
}
|
||
resultChan := make(chan connResult, 1)
|
||
|
||
// 在协程中尝试连接
|
||
go func() {
|
||
// 首先检查TCP连通性
|
||
tcpConn, err := net.DialTimeout("tcp", address, timeout)
|
||
if err != nil {
|
||
select {
|
||
case <-ctx.Done():
|
||
case resultChan <- connResult{nil, err, ""}:
|
||
}
|
||
return
|
||
}
|
||
tcpConn.Close()
|
||
|
||
// TCP连接成功,创建一个基础的连接对象用于服务识别
|
||
rabbitConn := &RabbitMQConnection{
|
||
conn: nil, // 服务识别阶段不需要真正的AMQP连接
|
||
amqpURL: fmt.Sprintf("amqp://guest:guest@%s:%s/", info.Host, info.Ports),
|
||
info: "RabbitMQ Service (Detected)",
|
||
}
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
case resultChan <- connResult{rabbitConn, nil, "RabbitMQ Service (Detected)"}:
|
||
}
|
||
}()
|
||
|
||
// 等待连接结果
|
||
select {
|
||
case result := <-resultChan:
|
||
if result.err != nil {
|
||
return nil, result.err
|
||
}
|
||
return result.conn, nil
|
||
case <-ctx.Done():
|
||
return nil, ctx.Err()
|
||
}
|
||
}
|
||
|
||
// Authenticate 进行RabbitMQ认证
|
||
func (c *RabbitMQConnector) Authenticate(ctx context.Context, conn interface{}, cred *base.Credential) error {
|
||
timeout := time.Duration(common.Timeout) * time.Second
|
||
amqpURL := fmt.Sprintf("amqp://%s:%s@%s:%d/", cred.Username, cred.Password, c.host, c.port)
|
||
|
||
// 结果通道
|
||
type authResult struct {
|
||
conn *amqp.Connection
|
||
err error
|
||
}
|
||
resultChan := make(chan authResult, 1)
|
||
|
||
// 在协程中尝试认证连接
|
||
go func() {
|
||
// 配置连接
|
||
config := amqp.Config{
|
||
Dial: func(network, addr string) (net.Conn, error) {
|
||
dialer := &net.Dialer{Timeout: timeout}
|
||
return dialer.DialContext(ctx, network, addr)
|
||
},
|
||
}
|
||
|
||
// 尝试连接
|
||
authConn, err := amqp.DialConfig(amqpURL, config)
|
||
select {
|
||
case <-ctx.Done():
|
||
if authConn != nil {
|
||
authConn.Close()
|
||
}
|
||
case resultChan <- authResult{authConn, err}:
|
||
}
|
||
}()
|
||
|
||
// 等待认证结果
|
||
select {
|
||
case result := <-resultChan:
|
||
if result.err != nil {
|
||
return fmt.Errorf(i18n.GetText("rabbitmq_auth_failed"), result.err)
|
||
}
|
||
|
||
// 更新连接信息
|
||
if rabbitConn, ok := conn.(*RabbitMQConnection); ok {
|
||
// 关闭旧连接
|
||
if rabbitConn.conn != nil {
|
||
rabbitConn.conn.Close()
|
||
}
|
||
// 更新为认证后的连接
|
||
rabbitConn.conn = result.conn
|
||
rabbitConn.username = cred.Username
|
||
rabbitConn.password = cred.Password
|
||
rabbitConn.amqpURL = amqpURL
|
||
}
|
||
|
||
return nil
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
|
||
// Close 关闭RabbitMQ连接
|
||
func (c *RabbitMQConnector) Close(conn interface{}) error {
|
||
if rabbitConn, ok := conn.(*RabbitMQConnection); ok {
|
||
if rabbitConn.conn != nil {
|
||
rabbitConn.conn.Close()
|
||
}
|
||
return nil
|
||
}
|
||
return fmt.Errorf("无效的RabbitMQ连接类型")
|
||
}
|
||
|
||
// GetConnectionInfo 获取连接信息
|
||
func (conn *RabbitMQConnection) GetConnectionInfo() map[string]interface{} {
|
||
info := map[string]interface{}{
|
||
"protocol": "AMQP",
|
||
"service": "RabbitMQ",
|
||
"info": conn.info,
|
||
}
|
||
|
||
if conn.username != "" {
|
||
info["username"] = conn.username
|
||
info["authenticated"] = true
|
||
}
|
||
|
||
return info
|
||
}
|
||
|
||
// IsAlive 检查连接是否仍然有效
|
||
func (conn *RabbitMQConnection) IsAlive() bool {
|
||
if conn.conn == nil {
|
||
return false
|
||
}
|
||
|
||
return !conn.conn.IsClosed()
|
||
}
|
||
|
||
// GetServerInfo 获取服务器信息
|
||
func (conn *RabbitMQConnection) GetServerInfo() string {
|
||
return conn.info
|
||
} |