mirror of
https://github.com/shadow1ng/fscan.git
synced 2025-09-14 14:06:44 +08:00
114 lines
2.8 KiB
Go
114 lines
2.8 KiB
Go
package kafka
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/IBM/sarama"
|
||
"github.com/shadow1ng/fscan/common"
|
||
"github.com/shadow1ng/fscan/plugins/base"
|
||
)
|
||
|
||
// KafkaConnection Kafka连接包装器
|
||
type KafkaConnection struct {
|
||
client sarama.Client
|
||
target string
|
||
}
|
||
|
||
// KafkaConnector Kafka连接器实现
|
||
type KafkaConnector struct {
|
||
host string
|
||
port string
|
||
}
|
||
|
||
// NewKafkaConnector 创建Kafka连接器
|
||
func NewKafkaConnector() *KafkaConnector {
|
||
return &KafkaConnector{}
|
||
}
|
||
|
||
// Connect 连接到Kafka服务
|
||
func (c *KafkaConnector) Connect(ctx context.Context, info *common.HostInfo) (interface{}, error) {
|
||
c.host = info.Host
|
||
c.port = info.Ports
|
||
|
||
target := fmt.Sprintf("%s:%s", c.host, c.port)
|
||
|
||
// 返回连接信息,实际连接在Authenticate时建立
|
||
return &KafkaConnection{
|
||
client: nil, // 延迟连接
|
||
target: target,
|
||
}, nil
|
||
}
|
||
|
||
// Authenticate 认证
|
||
func (c *KafkaConnector) Authenticate(ctx context.Context, conn interface{}, cred *base.Credential) error {
|
||
kafkaConn, ok := conn.(*KafkaConnection)
|
||
if !ok {
|
||
return fmt.Errorf("无效的连接类型")
|
||
}
|
||
|
||
// 关闭之前的连接(如果有)
|
||
if kafkaConn.client != nil {
|
||
kafkaConn.client.Close()
|
||
}
|
||
|
||
// 创建新的认证配置
|
||
timeout := time.Duration(common.Timeout) * time.Second
|
||
config := sarama.NewConfig()
|
||
config.Net.DialTimeout = timeout
|
||
config.Net.ReadTimeout = timeout
|
||
config.Net.WriteTimeout = timeout
|
||
config.Net.TLS.Enable = false
|
||
config.Version = sarama.V2_0_0_0
|
||
|
||
// 如果提供了用户名密码,设置SASL认证
|
||
if cred.Username != "" || cred.Password != "" {
|
||
config.Net.SASL.Enable = true
|
||
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
|
||
config.Net.SASL.User = cred.Username
|
||
config.Net.SASL.Password = cred.Password
|
||
config.Net.SASL.Handshake = true
|
||
}
|
||
|
||
brokers := []string{kafkaConn.target}
|
||
|
||
// 尝试作为消费者连接测试
|
||
consumer, err := sarama.NewConsumer(brokers, config)
|
||
if err == nil {
|
||
consumer.Close()
|
||
|
||
// 创建认证后的客户端
|
||
client, clientErr := sarama.NewClient(brokers, config)
|
||
if clientErr != nil {
|
||
return fmt.Errorf("创建认证客户端失败: %v", clientErr)
|
||
}
|
||
kafkaConn.client = client
|
||
return nil
|
||
}
|
||
|
||
// 如果消费者连接失败,尝试作为客户端连接
|
||
client, clientErr := sarama.NewClient(brokers, config)
|
||
if clientErr == nil {
|
||
kafkaConn.client = client
|
||
return nil
|
||
}
|
||
|
||
// 检查认证相关错误
|
||
if strings.Contains(err.Error(), "SASL") ||
|
||
strings.Contains(err.Error(), "authentication") ||
|
||
strings.Contains(err.Error(), "credentials") {
|
||
return fmt.Errorf("Kafka认证失败")
|
||
}
|
||
|
||
return fmt.Errorf("Kafka连接失败: %v", err)
|
||
}
|
||
|
||
// Close 关闭连接
|
||
func (c *KafkaConnector) Close(conn interface{}) error {
|
||
if kafkaConn, ok := conn.(*KafkaConnection); ok && kafkaConn.client != nil {
|
||
return kafkaConn.client.Close()
|
||
}
|
||
return nil
|
||
} |