fscan/plugins/services/kafka.go
ZacharyZcR af2c92a591 docs: 完善参数配置文档并修复插件兼容性
- 添加完整的参数配置表格和使用示例到README.md
- 修复Kafka插件的协议错误识别逻辑
- 修复RabbitMQ插件的AMQP协议检测
- 完成所有核心参数的功能验证测试
2025-09-02 05:41:22 +00:00

169 lines
4.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/plugins"
)
type KafkaPlugin struct {
plugins.BasePlugin
}
func NewKafkaPlugin() *KafkaPlugin {
return &KafkaPlugin{
BasePlugin: plugins.NewBasePlugin("kafka"),
}
}
func (p *KafkaPlugin) Scan(ctx context.Context, info *common.HostInfo) *plugins.Result {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
if common.DisableBrute {
return p.identifyService(ctx, info)
}
credentials := plugins.GenerateCredentials("kafka")
if len(credentials) == 0 {
return &plugins.Result{
Success: false,
Service: "kafka",
Error: fmt.Errorf("没有可用的测试凭据"),
}
}
for _, cred := range credentials {
// 检查上下文是否已取消
select {
case <-ctx.Done():
return &plugins.Result{
Success: false,
Service: "kafka",
Error: ctx.Err(),
}
default:
}
if client := p.testCredential(ctx, info, cred); client != nil {
client.Close()
common.LogSuccess(fmt.Sprintf("Kafka %s %s:%s", target, cred.Username, cred.Password))
return &plugins.Result{
Success: true,
Service: "kafka",
Username: cred.Username,
Password: cred.Password,
}
}
}
return &plugins.Result{
Success: false,
Service: "kafka",
Error: fmt.Errorf("未发现弱密码"),
}
}
func (p *KafkaPlugin) testCredential(ctx context.Context, info *common.HostInfo, cred plugins.Credential) sarama.Client {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
timeout := time.Duration(common.Timeout) * time.Second
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Version = sarama.V2_0_0_0
if cred.Username != "" || cred.Password != "" {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = cred.Username
config.Net.SASL.Password = cred.Password
config.Net.SASL.Handshake = true
}
brokers := []string{target}
client, err := sarama.NewClient(brokers, config)
if err != nil {
return nil
}
return client
}
func (p *KafkaPlugin) identifyService(ctx context.Context, info *common.HostInfo) *plugins.Result {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 尝试无认证连接
emptyCred := plugins.Credential{Username: "", Password: ""}
client := p.testCredential(ctx, info, emptyCred)
if client != nil {
defer client.Close()
banner := "Kafka (无认证)"
common.LogSuccess(fmt.Sprintf("Kafka %s %s", target, banner))
return &plugins.Result{
Success: true,
Service: "kafka",
Banner: banner,
}
}
// 尝试简单认证检测
config := sarama.NewConfig()
config.Net.DialTimeout = time.Duration(common.Timeout) * time.Second
config.Version = sarama.V2_0_0_0
brokers := []string{target}
client, err := sarama.NewClient(brokers, config)
if err != nil {
// 如果连接失败尝试检查是否是Kafka协议错误
if p.isKafkaProtocolError(err) {
banner := "Kafka (需要认证)"
common.LogSuccess(fmt.Sprintf("Kafka %s %s", target, banner))
return &plugins.Result{
Success: true,
Service: "kafka",
Banner: banner,
}
}
return &plugins.Result{
Success: false,
Service: "kafka",
Error: fmt.Errorf("无法识别为Kafka服务: %v", err),
}
}
defer client.Close()
banner := "Kafka"
common.LogSuccess(fmt.Sprintf("Kafka %s %s", target, banner))
return &plugins.Result{
Success: true,
Service: "kafka",
Banner: banner,
}
}
// isKafkaProtocolError 检查错误是否表示Kafka协议响应
func (p *KafkaPlugin) isKafkaProtocolError(err error) bool {
errStr := strings.ToLower(err.Error())
// Kafka常见的协议错误模式
return strings.Contains(errStr, "sasl") ||
strings.Contains(errStr, "authentication") ||
strings.Contains(errStr, "kafka") ||
strings.Contains(errStr, "protocol") ||
strings.Contains(errStr, "broker")
}
func init() {
plugins.RegisterWithPorts("kafka", func() plugins.Plugin {
return NewKafkaPlugin()
}, []int{9092, 9093, 9094})
}