fscan/plugins/services/kafka.go
ZacharyZcR 98a9a4e1c2 refactor: 重构Elasticsearch和Kafka插件使用统一发包控制
- 修改Elasticsearch插件,在HTTP连接和服务识别中添加发包控制
- 修改Kafka插件,在Sarama客户端连接中添加发包控制和计数
- 统一包计数逻辑,确保TCP连接成功和失败都正确计数
- 保持现有搜索引擎和消息队列检测功能
2025-09-02 11:51:34 +00:00

189 lines
4.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/plugins"
)
type KafkaPlugin struct {
plugins.BasePlugin
}
func NewKafkaPlugin() *KafkaPlugin {
return &KafkaPlugin{
BasePlugin: plugins.NewBasePlugin("kafka"),
}
}
func (p *KafkaPlugin) Scan(ctx context.Context, info *common.HostInfo) *plugins.Result {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
if common.DisableBrute {
return p.identifyService(ctx, info)
}
credentials := plugins.GenerateCredentials("kafka")
if len(credentials) == 0 {
return &plugins.Result{
Success: false,
Service: "kafka",
Error: fmt.Errorf("没有可用的测试凭据"),
}
}
for _, cred := range credentials {
// 检查上下文是否已取消
select {
case <-ctx.Done():
return &plugins.Result{
Success: false,
Service: "kafka",
Error: ctx.Err(),
}
default:
}
if client := p.testCredential(ctx, info, cred); client != nil {
client.Close()
common.LogSuccess(fmt.Sprintf("Kafka %s %s:%s", target, cred.Username, cred.Password))
return &plugins.Result{
Success: true,
Service: "kafka",
Username: cred.Username,
Password: cred.Password,
}
}
}
return &plugins.Result{
Success: false,
Service: "kafka",
Error: fmt.Errorf("未发现弱密码"),
}
}
func (p *KafkaPlugin) testCredential(ctx context.Context, info *common.HostInfo, cred plugins.Credential) sarama.Client {
// 检查发包限制
if canSend, reason := common.CanSendPacket(); !canSend {
common.LogError(fmt.Sprintf("Kafka连接 %s:%s 受限: %s", info.Host, info.Ports, reason))
return nil
}
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
timeout := time.Duration(common.Timeout) * time.Second
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Version = sarama.V2_0_0_0
if cred.Username != "" || cred.Password != "" {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = cred.Username
config.Net.SASL.Password = cred.Password
config.Net.SASL.Handshake = true
}
brokers := []string{target}
client, err := sarama.NewClient(brokers, config)
if err != nil {
common.IncrementTCPFailedPacketCount()
return nil
}
common.IncrementTCPSuccessPacketCount()
return client
}
func (p *KafkaPlugin) identifyService(ctx context.Context, info *common.HostInfo) *plugins.Result {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 检查发包限制
if canSend, reason := common.CanSendPacket(); !canSend {
common.LogError(fmt.Sprintf("Kafka识别 %s 受限: %s", target, reason))
return &plugins.Result{
Success: false,
Service: "kafka",
Error: fmt.Errorf("发包受限: %s", reason),
}
}
// 尝试无认证连接
emptyCred := plugins.Credential{Username: "", Password: ""}
client := p.testCredential(ctx, info, emptyCred)
if client != nil {
defer client.Close()
banner := "Kafka (无认证)"
common.LogSuccess(fmt.Sprintf("Kafka %s %s", target, banner))
return &plugins.Result{
Success: true,
Service: "kafka",
Banner: banner,
}
}
// 尝试简单认证检测
config := sarama.NewConfig()
config.Net.DialTimeout = time.Duration(common.Timeout) * time.Second
config.Version = sarama.V2_0_0_0
brokers := []string{target}
client, err := sarama.NewClient(brokers, config)
if err != nil {
common.IncrementTCPFailedPacketCount()
// 如果连接失败尝试检查是否是Kafka协议错误
if p.isKafkaProtocolError(err) {
banner := "Kafka (需要认证)"
common.LogSuccess(fmt.Sprintf("Kafka %s %s", target, banner))
return &plugins.Result{
Success: true,
Service: "kafka",
Banner: banner,
}
}
return &plugins.Result{
Success: false,
Service: "kafka",
Error: fmt.Errorf("无法识别为Kafka服务: %v", err),
}
}
common.IncrementTCPSuccessPacketCount()
defer client.Close()
banner := "Kafka"
common.LogSuccess(fmt.Sprintf("Kafka %s %s", target, banner))
return &plugins.Result{
Success: true,
Service: "kafka",
Banner: banner,
}
}
// isKafkaProtocolError 检查错误是否表示Kafka协议响应
func (p *KafkaPlugin) isKafkaProtocolError(err error) bool {
errStr := strings.ToLower(err.Error())
// Kafka常见的协议错误模式
return strings.Contains(errStr, "sasl") ||
strings.Contains(errStr, "authentication") ||
strings.Contains(errStr, "kafka") ||
strings.Contains(errStr, "protocol") ||
strings.Contains(errStr, "broker")
}
func init() {
plugins.RegisterWithPorts("kafka", func() plugins.Plugin {
return NewKafkaPlugin()
}, []int{9092, 9093, 9094})
}