package kafka import ( "context" "fmt" "strings" "time" "github.com/IBM/sarama" "github.com/shadow1ng/fscan/common" "github.com/shadow1ng/fscan/plugins/base" ) // KafkaConnection Kafka连接包装器 type KafkaConnection struct { client sarama.Client target string } // KafkaConnector Kafka连接器实现 type KafkaConnector struct { host string port string } // NewKafkaConnector 创建Kafka连接器 func NewKafkaConnector() *KafkaConnector { return &KafkaConnector{} } // Connect 连接到Kafka服务 func (c *KafkaConnector) Connect(ctx context.Context, info *common.HostInfo) (interface{}, error) { c.host = info.Host c.port = info.Ports target := fmt.Sprintf("%s:%s", c.host, c.port) // 返回连接信息,实际连接在Authenticate时建立 return &KafkaConnection{ client: nil, // 延迟连接 target: target, }, nil } // Authenticate 认证 func (c *KafkaConnector) Authenticate(ctx context.Context, conn interface{}, cred *base.Credential) error { kafkaConn, ok := conn.(*KafkaConnection) if !ok { return fmt.Errorf("无效的连接类型") } // 关闭之前的连接(如果有) if kafkaConn.client != nil { kafkaConn.client.Close() } // 创建新的认证配置 timeout := time.Duration(common.Timeout) * time.Second config := sarama.NewConfig() config.Net.DialTimeout = timeout config.Net.ReadTimeout = timeout config.Net.WriteTimeout = timeout config.Net.TLS.Enable = false config.Version = sarama.V2_0_0_0 // 如果提供了用户名密码,设置SASL认证 if cred.Username != "" || cred.Password != "" { config.Net.SASL.Enable = true config.Net.SASL.Mechanism = sarama.SASLTypePlaintext config.Net.SASL.User = cred.Username config.Net.SASL.Password = cred.Password config.Net.SASL.Handshake = true } brokers := []string{kafkaConn.target} // 尝试作为消费者连接测试 consumer, err := sarama.NewConsumer(brokers, config) if err == nil { consumer.Close() // 创建认证后的客户端 client, clientErr := sarama.NewClient(brokers, config) if clientErr != nil { return fmt.Errorf("创建认证客户端失败: %v", clientErr) } kafkaConn.client = client return nil } // 如果消费者连接失败,尝试作为客户端连接 client, clientErr := sarama.NewClient(brokers, config) if clientErr == nil { kafkaConn.client = client return nil } // 检查认证相关错误 if strings.Contains(err.Error(), "SASL") || strings.Contains(err.Error(), "authentication") || strings.Contains(err.Error(), "credentials") { return fmt.Errorf("Kafka认证失败") } return fmt.Errorf("Kafka连接失败: %v", err) } // Close 关闭连接 func (c *KafkaConnector) Close(conn interface{}) error { if kafkaConn, ok := conn.(*KafkaConnection); ok && kafkaConn.client != nil { return kafkaConn.client.Close() } return nil }