fscan/Plugins/services/kafka/plugin.go
ZacharyZcR f097d2812a refactor: 清理项目死代码和未使用函数
- 移除所有未使用的generateCredentials方法
- 删除插件适配器中的过时函数
- 清理MySQL连接器中的无用方法
- 移除Redis利用器中的未调用函数
- 删除遗留加密函数和基础扫描器无用方法
- 完全移除未注册的VNC插件
- 优化代码结构,提升项目可维护性

清理统计: 移除25+个死代码函数,减少400+行无用代码
2025-08-12 11:51:36 +08:00

206 lines
5.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package kafka
import (
"context"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/common/i18n"
"github.com/shadow1ng/fscan/plugins/base"
)
// KafkaPlugin Kafka插件实现
type KafkaPlugin struct {
*base.ServicePlugin
exploiter *KafkaExploiter
}
// NewKafkaPlugin 创建Kafka插件
func NewKafkaPlugin() *KafkaPlugin {
// 插件元数据
metadata := &base.PluginMetadata{
Name: "kafka",
Version: "2.0.0",
Author: "fscan-team",
Description: "Apache Kafka消息队列扫描和利用插件",
Category: "service",
Ports: []int{9092, 9093, 9094}, // Kafka常用端口
Protocols: []string{"tcp"},
Tags: []string{"kafka", "message-queue", "bruteforce", "unauthorized"},
}
// 创建连接器和服务插件
connector := NewKafkaConnector()
servicePlugin := base.NewServicePlugin(metadata, connector)
// 创建Kafka插件
plugin := &KafkaPlugin{
ServicePlugin: servicePlugin,
exploiter: NewKafkaExploiter(),
}
// 设置能力
plugin.SetCapabilities([]base.Capability{
base.CapWeakPassword,
base.CapDataExtraction,
})
return plugin
}
// Scan 重写扫描方法,先检测无认证访问
func (p *KafkaPlugin) Scan(ctx context.Context, info *common.HostInfo) (*base.ScanResult, error) {
// 如果禁用了暴力破解,只进行服务识别
if common.DisableBrute {
return p.performServiceIdentification(ctx, info)
}
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 先尝试无认证访问
unauthCred := &base.Credential{Username: "", Password: ""}
unauthResult, err := p.ScanCredential(ctx, info, unauthCred)
if err == nil && unauthResult.Success {
// 无认证访问成功
common.LogSuccess(i18n.GetText("kafka_unauth_access", target))
return &base.ScanResult{
Success: true,
Service: "Kafka",
Credentials: []*base.Credential{unauthCred},
Extra: map[string]interface{}{
"service": "Kafka",
"port": info.Ports,
"unauthorized": true,
"access_type": "no_authentication",
},
}, nil
}
// 执行基础的密码扫描
result, err := p.ServicePlugin.Scan(ctx, info)
if err != nil || !result.Success {
return result, err
}
// 记录成功的弱密码发现
cred := result.Credentials[0]
common.LogSuccess(i18n.GetText("kafka_weak_pwd_success", target, cred.Username, cred.Password))
return result, nil
}
// 已移除未使用的 generateCredentials 方法
// Exploit 使用exploiter执行利用
func (p *KafkaPlugin) Exploit(ctx context.Context, info *common.HostInfo, creds *base.Credential) (*base.ExploitResult, error) {
return p.exploiter.Exploit(ctx, info, creds)
}
// GetExploitMethods 获取利用方法
func (p *KafkaPlugin) GetExploitMethods() []base.ExploitMethod {
return p.exploiter.GetExploitMethods()
}
// IsExploitSupported 检查利用支持
func (p *KafkaPlugin) IsExploitSupported(method base.ExploitType) bool {
return p.exploiter.IsExploitSupported(method)
}
// performServiceIdentification 执行Kafka服务识别-nobr模式
func (p *KafkaPlugin) performServiceIdentification(ctx context.Context, info *common.HostInfo) (*base.ScanResult, error) {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 尝试连接Kafka获取版本信息
kafkaInfo, isKafka := p.identifyKafkaService(ctx, info)
if isKafka {
// 记录服务识别成功
common.LogSuccess(i18n.GetText("kafka_service_identified", target, kafkaInfo))
return &base.ScanResult{
Success: true,
Service: "Kafka",
Banner: kafkaInfo,
Extra: map[string]interface{}{
"service": "Kafka",
"port": info.Ports,
"info": kafkaInfo,
},
}, nil
}
// 如果无法识别为Kafka返回失败
return &base.ScanResult{
Success: false,
Error: fmt.Errorf("无法识别为Kafka服务"),
}, nil
}
// identifyKafkaService 通过连接识别Kafka服务
func (p *KafkaPlugin) identifyKafkaService(ctx context.Context, info *common.HostInfo) (string, bool) {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
timeout := time.Duration(common.Timeout) * time.Second
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Net.TLS.Enable = false
config.Version = sarama.V2_0_0_0
brokers := []string{target}
// 尝试创建客户端连接
client, err := sarama.NewClient(brokers, config)
if err != nil {
// 检查错误是否表明这是Kafka服务但认证失败
if strings.Contains(strings.ToLower(err.Error()), "kafka") ||
strings.Contains(strings.ToLower(err.Error()), "sasl") ||
strings.Contains(strings.ToLower(err.Error()), "authentication") {
return fmt.Sprintf("Kafka服务 (需要认证): %v", err), true
}
return "", false
}
defer client.Close()
// 获取集群信息
brokerList := client.Brokers()
if len(brokerList) > 0 {
return fmt.Sprintf("Kafka集群 (Brokers: %d)", len(brokerList)), true
}
return "Kafka服务", true
}
// =============================================================================
// 插件注册
// =============================================================================
// RegisterKafkaPlugin 注册Kafka插件
func RegisterKafkaPlugin() {
factory := base.NewSimplePluginFactory(
&base.PluginMetadata{
Name: "kafka",
Version: "2.0.0",
Author: "fscan-team",
Description: "Apache Kafka消息队列扫描和利用插件",
Category: "service",
Ports: []int{9092, 9093, 9094}, // Kafka常用端口
Protocols: []string{"tcp"},
Tags: []string{"kafka", "message-queue", "bruteforce", "unauthorized"},
},
func() base.Plugin {
return NewKafkaPlugin()
},
)
base.GlobalPluginRegistry.Register("kafka", factory)
}
// 自动注册
func init() {
RegisterKafkaPlugin()
}