fscan/Plugins/services/kafka/connector.go

114 lines
2.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package kafka
import (
"context"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/plugins/base"
)
// KafkaConnection Kafka连接包装器
type KafkaConnection struct {
client sarama.Client
target string
}
// KafkaConnector Kafka连接器实现
type KafkaConnector struct {
host string
port string
}
// NewKafkaConnector 创建Kafka连接器
func NewKafkaConnector() *KafkaConnector {
return &KafkaConnector{}
}
// Connect 连接到Kafka服务
func (c *KafkaConnector) Connect(ctx context.Context, info *common.HostInfo) (interface{}, error) {
c.host = info.Host
c.port = info.Ports
target := fmt.Sprintf("%s:%s", c.host, c.port)
// 返回连接信息实际连接在Authenticate时建立
return &KafkaConnection{
client: nil, // 延迟连接
target: target,
}, nil
}
// Authenticate 认证
func (c *KafkaConnector) Authenticate(ctx context.Context, conn interface{}, cred *base.Credential) error {
kafkaConn, ok := conn.(*KafkaConnection)
if !ok {
return fmt.Errorf("无效的连接类型")
}
// 关闭之前的连接(如果有)
if kafkaConn.client != nil {
kafkaConn.client.Close()
}
// 创建新的认证配置
timeout := time.Duration(common.Timeout) * time.Second
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Net.TLS.Enable = false
config.Version = sarama.V2_0_0_0
// 如果提供了用户名密码设置SASL认证
if cred.Username != "" || cred.Password != "" {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = cred.Username
config.Net.SASL.Password = cred.Password
config.Net.SASL.Handshake = true
}
brokers := []string{kafkaConn.target}
// 尝试作为消费者连接测试
consumer, err := sarama.NewConsumer(brokers, config)
if err == nil {
consumer.Close()
// 创建认证后的客户端
client, clientErr := sarama.NewClient(brokers, config)
if clientErr != nil {
return fmt.Errorf("创建认证客户端失败: %v", clientErr)
}
kafkaConn.client = client
return nil
}
// 如果消费者连接失败,尝试作为客户端连接
client, clientErr := sarama.NewClient(brokers, config)
if clientErr == nil {
kafkaConn.client = client
return nil
}
// 检查认证相关错误
if strings.Contains(err.Error(), "SASL") ||
strings.Contains(err.Error(), "authentication") ||
strings.Contains(err.Error(), "credentials") {
return fmt.Errorf("Kafka认证失败")
}
return fmt.Errorf("Kafka连接失败: %v", err)
}
// Close 关闭连接
func (c *KafkaConnector) Close(conn interface{}) error {
if kafkaConn, ok := conn.(*KafkaConnection); ok && kafkaConn.client != nil {
return kafkaConn.client.Close()
}
return nil
}